SharePointからファイルを取り込む
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。「Databricks プレビューの管理」を参照してください。
:::注記 コンプライアンス
SharePointコネクタは、拡張セキュリティおよびコンプライアンス設定が有効になっているワークスペースでの使用をサポートしています。
:::
構造化ファイル、半構造化ファイル、非構造化ファイルを Microsoft SharePoint から Delta テーブルに取り込むことができます。SharePoint コネクタは、 Unity Catalogガバナンスを備えたバッチAPIsとストリーミング API ( Auto Loader 、 spark.read 、 COPY INTOなど) を使用した SharePoint ファイルの増分取り込みをサポートします。
SharePointコネクタを選択する
LakeFlow Connectは2つのSharePointコネクタを提供しています。これらはいずれも SharePoint のデータにアクセスしますが、管理レベルは異なります。
コネクター | 説明 |
|---|---|
フルマネージド コネクタ。Deltaテーブルにデータを取り込み、ソースと同期させる、エンタープライズアプリケーション向けのシンプルでメンテナンスが容易なコネクタです。 | |
標準 SharePoint コネクタ | SQL、PySpark、またはLakeflow Spark宣言型パイプラインを使用して、 |
Databricksでは、ほとんどのユースケースでマネージド SharePoint コネクタを使用することをお勧めします。自動的な増分取り込み、幅広い形式のサポート、および柔軟なファイルとフォルダーの選択機能を備えた、フルマネージドのエクスペリエンスを提供します。
主な特徴
標準の SharePoint コネクタは以下を提供します。
- 構造化ファイル、半構造化ファイル、非構造化ファイルの取り込み
- きめ細かな取り込み: 特定のサイト、サブサイト、ドキュメント ライブラリ、フォルダー、または単一のファイルを取り込む
spark.read、 Auto Loader 、およびCOPY INTO- CSV や Excel などの構造化および半構造化形式の自動スキーマ推論と進化
- Unity Catalog接続による安全な資格情報ストレージ
- パターンマッチングによるファイル選択
pathGlobFilter
要件
SharePoint からファイルを取り込むには、次のものが必要です。
-
Unity Catalog が有効になっているワークスペース。
-
CREATE CONNECTIONSharePoint接続を作成するための権限、またはクラスターアクセスモードに基づいて既存の接続を使用するための適切な権限。- 専用アクセスMode :
MANAGE CONNECTION。 - 標準アクセスMode :
USE CONNECTION。
- 専用アクセスMode :
-
Databricks Runtimeバージョン 17.3 LTS以降を使用するコンピュート。
-
OAuth 認証は
Sites.Read.AllまたはSites.Selected権限スコープで設定されています。 -
SharePoint ベータ機能は、 プレビュー ページから有効になります。「Databricks プレビューの管理」を参照してください。
-
オプション:Excelファイルの解析用にExcelベータ機能を有効にします。「Excel ファイルの読み込みとストリーム」を参照してください。
接続を作成する
SharePoint 資格情報を保存するためのUnity Catalog接続を作成します。 接続セットアップ プロセスは、標準 SharePoint コネクタとマネージ SharePoint コネクタの両方で共有されます。
OAuth 認証オプションを含む完全な接続セットアップ手順については、 「SharePoint 取り込みセットアップの概要」を参照してください。
SharePointからファイルを読み取る
ファイルを読み取るには、 databricks.connectionオプションを使用して作成した接続と、アクセスしたい SharePoint リソースを指す URL を渡します。ご提供いただくURLによって、データ取り込みの範囲が決まります。
Databricks Runtime 17.3 LTS以降では、以下のパスタイプがサポートされています。
パスの種類 | 説明 |
|---|---|
サイト | アドレスバーからサイトのURLをコピーしてください。
|
サブサイト | アドレスバーからサブサイトのURLをコピーしてください。
|
ドキュメントライブラリ | サイトコンテンツ からライブラリを開き、アドレスバーからURLをコピーしてください。
|
フォルダ | サイトコンテンツ からフォルダを開き、アドレスバーからURLをコピーします。または、SharePoint でフォルダーの 詳細 ペインを開き、 [パス] の横にあるコピーアイコンをクリックします。
|
ファイル | ファイルを選択し、オーバーフローメニュー( … )をクリックして、 プレビュー を選択します。アドレスバーからURLをコピーしてください。または、SharePoint でファイルの 詳細 ペインを開き、 [パス] の横にあるコピーアイコンをクリックします。
|
Databricks Runtime 18 以降では、次のパスタイプのサポートが追加されます:
パスの種類 | 説明 |
|---|---|
ネストされたサブサイト | アドレスバーからサブサイトのURLをコピーしてください。
|
共有リンク | ファイルまたはフォルダを選択し、オーバーフローメニュー( ... )をクリックして、 [リンクをコピー] を選択します。Databricksは、共有リンクの有効期限を無期限に設定することを推奨しています。
|
Microsoft 365 for the web(旧称:Office) | Microsoft 365 for the webでファイルを開き、アドレスバーからURLをコピーしてください。
|
Databricks Runtime 19以降では、次のパスの種類がサポートされています:
パスの種類 | 説明 |
|---|---|
テナント | アドレスバーからテナントのルートURLをコピーしてください。
|
例
標準の SharePoint コネクタを使用してファイルを読み取る方法はいくつかあります。
Auto Loaderを使用して SharePoint ファイルをストリーミングする
Auto Loader は、SharePoint から構造化ファイルを段階的に取り込む最も効率的な方法を提供します。新しいファイルが自動的に検出され、到着すると処理されます。また、自動スキーマ推論および進化により、CSV や JSON などの構造化ファイルや半構造化ファイルを取り込むこともできます。Auto Loader使用法の詳細については、 「一般的なデータ読み込みパターン」を参照してください。
# Incrementally ingest new PDF files
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("cloudFiles.schemaLocation", <path to a schema location>)
.option("pathGlobFilter", "*.pdf")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)
# Incrementally ingest CSV files with automatic schema inference and evolution
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("inferColumnTypes", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)
Spark バッチ読み取りを使用して SharePoint ファイルを読み取る
次の例は、 spark.read関数を使用して Python で SharePoint ファイルを取り込む方法を示しています。
# Read unstructured data as binary files
df = (spark.read
.format("binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("recursiveFileLookup", True)
.option("pathGlobFilter", "*.pdf") # optional. Example: only ingest PDFs
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"))
# Read a batch of CSV files, infer the schema, and load the data into a DataFrame
df = (spark.read
.format("csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("recursiveFileLookup", True)
.option("inferSchema", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"))
# Read a specific Excel file from SharePoint, infer the schema, and load the data into a DataFrame
df = (spark.read
.format("excel")
.option("databricks.connection", "my_sharepoint_conn")
.option("headerRows", 1) # optional
.option("dataAddress", "Sheet1!A1:M20") # optional
.load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"))
Spark SQL を使用して SharePoint ファイルを読み取る
次の例は、 read_filesテーブル値関数を使用して SQL で SharePoint ファイルを取り込む方法を示しています。read_files使用法の詳細については、 read_filesテーブル値関数を参照してください。
-- Read pdf files
CREATE TABLE my_table AS
SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
`databricks.connection` => "my_sharepoint_conn",
format => "binaryFile",
pathGlobFilter => "*.pdf", -- optional. Example: only ingest PDFs
schemaEvolutionMode => "none"
);
-- Read a specific Excel sheet and range
CREATE TABLE my_sheet_table AS
SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
`databricks.connection` => "my_sharepoint_conn",
format => "excel",
headerRows => 1, -- optional
dataAddress => "Sheet1!A2:D10", -- optional
schemaEvolutionMode => "none"
);
増分取り込み COPY INTO
COPY INTO Delta テーブルへのファイルのべき等増分ロードを提供します。COPY INTO使用法の詳細については、 COPY INTOを使用した一般的なデータ読み込みパターンを参照してください。
CREATE TABLE IF NOT EXISTS sharepoint_pdf_table;
CREATE TABLE IF NOT EXISTS sharepoint_csv_table;
CREATE TABLE IF NOT EXISTS sharepoint_excel_table;
# Incrementally ingest new PDF files
COPY INTO sharepoint_pdf_table
FROM "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"
FILEFORMAT = BINARYFILE
PATTERN = '*.pdf'
FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn')
COPY_OPTIONS ('mergeSchema' = 'true');
# Incrementally ingest CSV files with automatic schema inference and evolution
COPY INTO sharepoint_csv_table
FROM "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
# Ingest a single Excel file
COPY INTO sharepoint_excel_table
FROM "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"
FILEFORMAT = EXCEL
FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'headerRows' = '1')
COPY_OPTIONS ('mergeSchema' = 'true');
LakeFlow Spark宣言型パイプラインで SharePoint ファイルを取り込む
SharePoint コネクタには、Databricks Runtime 17.3 以降が必要です。コネクタを使用するには、パイプライン設定で"CHANNEL" = "PREVIEW"を設定します。プレビューの詳細については、 「パイプライン プロパティのリファレンス」を参照してください。
次の例は、 LakeFlow Spark宣言型パイプラインでAuto Loaderを使用して SharePoint ファイルを読み取る方法を示しています。
- Python
- SQL
from pyspark import pipelines as dp
# Incrementally ingest new PDF files
@dp.table
def sharepoint_pdf_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.pdf")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)
# Incrementally ingest CSV files with automatic schema inference and evolution
@dp.table
def sharepoint_csv_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("databricks.connection", "my_sharepoint_conn")
.option("pathGlobFilter", "*.csv")
.option("inferColumnTypes", True)
.option("header", True)
.load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)
# Read a specific Excel file from SharePoint in a materialized view
@dp.table
def sharepoint_excel_table():
return (spark.read.format("excel")
.option("databricks.connection", "my_sharepoint_conn")
.option("headerRows", 1) # optional
.option("inferColumnTypes", True) # optional
.option("dataAddress", "Sheet1!A1:M20") # optional
.load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx")
-- Incrementally ingest new PDF files
CREATE OR REFRESH STREAMING TABLE sharepoint_pdf_table
AS SELECT * FROM STREAM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
format => "binaryFile",
`databricks.connection` => "my_sharepoint_conn",
pathGlobFilter => "*.pdf");
-- Incrementally ingest CSV files with automatic schema inference and evolution
CREATE OR REFRESH STREAMING TABLE sharepoint_csv_table
AS SELECT * FROM STREAM read_files(
"https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs",
format => "csv",
`databricks.connection` => "my_sharepoint_conn",
pathGlobFilter => "*.csv",
"header", "true");
-- Read a specific Excel file from SharePoint in a materialized view
CREATE OR REFRESH MATERIALIZED VIEW sharepoint_excel_table
AS SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
`databricks.connection` => "my_sharepoint_conn",
format => "excel",
headerRows => 1, -- optional
dataAddress => "Sheet1!A2:D10", -- optional
`cloudFiles.schemaEvolutionMode` => "none"
);
非構造化ファイルを解析する
binaryFile形式の標準 SharePoint コネクタを使用して SharePoint から非構造化ファイル (PDF、Word 文書、PowerPoint ファイルなど) を取り込むと、ファイルの内容は生のバイナリ データとして保存されます。これらのファイルを AI ワークロード (RAG、検索、分類、ドキュメント理解など) 用に準備するには、 ai_parse_documentを使用してバイナリ コンテンツを構造化されたクエリ可能な出力に解析できます。
次の例は、 documentsという名前のブロンズ Delta テーブルに格納されている非構造化ドキュメントを解析し、解析されたコンテンツを含む新しい列を追加する方法を示しています。
CREATE TABLE documents AS
SELECT * FROM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
`databricks.connection` => "my_sharepoint_conn",
format => "binaryFile",
pathGlobFilter => "*.{pdf,docx}",
schemaEvolutionMode => "none"
);
SELECT *, ai_parse_document(content) AS parsed_content
FROM documents;
parsed_content列には、下流の AI パイプラインに直接使用できる抽出されたテキスト、テーブル、レイアウト情報、メタデータが含まれています。
LakeFlow Spark宣言型パイプラインを使用した増分解析
LakeFlow Spark宣言型パイプライン内でai_parse_document使用して、増分解析を有効にすることもできます。 SharePoint から新しいファイルがストリームされると、パイプラインの更新時に自動的に解析されます。
たとえば、新しく取り込まれたドキュメントを継続的に解析するマテリアライズドビューを定義できます。
CREATE OR REFRESH STREAMING TABLE sharepoint_documents_table
AS SELECT * FROM STREAM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
format => "binaryFile",
`databricks.connection` => "my_sharepoint_conn",
pathGlobFilter => "*.{pdf,docx}");
CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT *, ai_parse_document(content) AS parsed_content
FROM sharepoint_documents_table;
このアプローチにより、次のことが保証されます。
- 新しく取り込まれたSharePointファイルは、マテリアライズドビューが更新されるたびに自動的に解析されます。
- 解析された出力は入力データと同期されます
- 下流のAIパイプラインは常に最新のドキュメント表現で動作します
詳細: サポートされている形式と詳細オプションについては、 ai_parse_document を参照してください。
SharePoint メタデータ列
プレビュー
この機能はプライベートプレビュー版です。お試しになりたい場合は、Databricksの担当者までご連絡ください。
_sharepoint_metadata列は、 Microsoft Graphドライブアイテムから取り込まれたファイルのSharePoint固有のプロパティへのアクセスを提供する非表示のメタデータ列です。 Databricks Runtime 18以降が必要で、SharePointから読み込む際のすべてのファイル形式に対応しています。返されるDataFrameに_sharepoint_metadata列を含めるには、読み取りクエリで明示的に選択する必要があります。
データソースに_sharepoint_metadataという名前の列が含まれている場合、重複排除のため、SharePointメタデータ列の名前は__sharepoint_metadata (先頭にアンダースコアを追加)に変更されます。名前が一意になるまで、アンダースコアが追加されます。
ファイルパスやサイズなどの一般的なファイルメタデータは、 _metadata列を使用して照会できます。詳細については、 「ファイルのメタデータ列」を参照してください。
スキーマ
_sharepoint_metadata列は、以下のフィールドを含むSTRUCTです。すべてのフィールドはnullを許容します。
名前 | Type | 説明 | 例 |
|---|---|---|---|
アイテムID |
| アイテムのドライブアイテムID。 |
|
サイトID |
| アイテムが含まれているSharePointサイトのID。 |
|
ドライブID |
| アイテムが格納されているドライブのID。 |
|
ドライブタイプ |
| ドライブの種類。たとえば、SharePoint ライブラリの場合は |
|
親ID |
| 親フォルダーのドライブアイテムID。 |
|
親の名前 |
| 親フォルダの名前。 |
|
親パス |
| 親フォルダのドライブ相対パス。 |
|
ウェブURL |
| SharePoint上のアイテムのブラウザURL。 |
|
mime_type |
| アイテムのMIMEタイプ。 |
|
電子メールで作成されました |
| アイテムを作成したユーザーのメールアドレス。 |
|
作成者名 |
| アイテムを作成したユーザーの表示名。 |
|
作成日時 |
| そのアイテムが作成された時刻。 |
|
最終更新者(メール) |
| 最後にアイテムを編集したユーザーのメールアドレス。 |
|
最終更新者名 |
| 最後にアイテムを変更したユーザーの表示名。 |
|
etag |
| 商品のEタグ。アイテムまたはそのメタデータのいずれかが変更された場合に変更されます。 |
|
ctag |
| 商品の変更タグ。アイテムの内容が変更された場合にのみ変更されます。 |
|
説明 |
| 設定されている場合は、商品の説明が表示されます。 |
|
追加のメタデータ |
| Microsoft Graphによって返されるが、上記で抽出されなかったその他のdriveItemフィールド。 |
|
additional_metadataフィールドはVARIANTとして返されます。VARIANT型を参照してください。
例
以下の例は、読み取りクエリに_sharepoint_metadata列を含める方法、列から特定のフィールドを選択する方法、およびadditional_metadata VARIANTフィールドから値を抽出する方法を示しています。
- Python
- SQL
df = (spark.read
.format("binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
.select("*", "_metadata", "_sharepoint_metadata"))
SELECT *, _sharepoint_metadata
FROM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
`databricks.connection` => "my_sharepoint_conn",
format => "binaryFile"
);
_sharepoint_metadata構造体から特定のフィールドを選択します。
df = (spark.read
.format("binaryFile")
.option("databricks.connection", "my_sharepoint_conn")
.load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
.select("_sharepoint_metadata.item_id", "_sharepoint_metadata.etag"))
additional_metadata VARIANTフィールドから::キャスト演算子を使用して値を抽出します。
SELECT
*,
_sharepoint_metadata.additional_metadata:shared:scope::STRING AS shared_scope
FROM read_files(
"https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
`databricks.connection` => "my_sharepoint_conn",
format => "binaryFile"
);
制限事項
標準の SharePoint コネクタには次の制限があります。
- 同じクエリで複数のサイトを取り込むことはできません。2つのサイトから取り込むには、2つの個別のクエリを作成する必要があります。
pathGlobFilterオプションを使用して、ファイル名をフィルタリングできます。フォルダパスによるフィルタリングはサポートされていません。- SharePointリストおよび.aspxサイトページはサポートされていません。ドキュメントライブラリ内のファイルのみがサポートされています。
- SharePoint サーバーへの書き戻しはサポートされていません。
- Auto Loader
cleanSource(取り込み後にソースでファイルを削除またはアーカイブする) はサポートされていません。
次のステップ
- 高度なストリーミング取り込みパターンのためのAuto Loaderについて学ぶ
- べき等増分ロードのCOPY INTOを調べる
- クラウドオブジェクトストレージの取り込みパターンと比較する
- ジョブスケジュールを設定して取り込みワークフローを自動化する
- LakeFlow Spark宣言型パイプラインを使用して、変換を伴うエンドツーエンドのデータパイプラインを構築する