メインコンテンツまでスキップ

ダウンストリームRAGの使用例

備考

ベータ版

管理された SharePoint コネクタはベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。「Databricks プレビューの管理」を参照してください。

SharePoint パイプラインを作成したので、生のドキュメントをテキストに解析したり、解析されたデータをチャンク化したり、チャンクから埋め込みを作成したりできます。その後、出力テーブルで readStream をダウンストリーム パイプラインで直接使用できます。

非構造化文書を解析する

多くのダウンストリーム RAG およびドキュメント理解ワークロードでは、生の非構造化ファイル (PDF、PPTX、Word ドキュメント、画像など) を構造化されたクエリ可能な表現に変換する必要があります。Databricks には、バイナリ ファイル コンテンツからテキスト、テーブル、レイアウト情報、メタデータ、その他の構造化シグナルを自動的に抽出する組み込み関数であるai_parse_documentが用意されています。

SharePoint 取り込みパイプラインによって生成されたinline_content列にai_parse_document直接適用できます。これは、検索拡張生成 (RAG)、分類、エンティティ抽出、ドキュメント中心のエージェントの構築など、ほとんどの非構造化ダウンストリームのユースケースに推奨されるアプローチです。

詳細については、 ai_parse_document を参照してください。

例: Sharepoint ファイルの変換

LakeFlow Spark宣言型パイプライン (マテリアライズドビュー、ストリーミング テーブルなど) を使用して、取り込まれた SharePoint ファイルを解析された構造化出力に段階的に変換できます。 次の例は、新しく到着した各ドキュメントを解析するマテリアライズドビューを作成する方法を示しています。

SQL
CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT
*,
ai_parse_document(content.inline_content) AS parsed
FROM <your_catalog>.<your_schema>.<your_destination_table>;

このビューでは、SharePoint 取り込みパイプラインを通じて新しいファイルが到着すると、解析されたドキュメントの表現が最新の状態に保たれます。parsed列は、ダウンストリームのユースケースに使用できるようになります。

個々のファイルコンテンツにアクセスする

カスタム ライブラリやツールと統合する場合など、ファイルを直接操作したい場合、Databricks では、インジェスト パイプラインからの出力テーブルで実行できる追加のファイル アクセス UDF が提供されます。

名前

説明

read_blob_as_file(blob カラム、ファイル名カラム)

ファイルをローカル ディスクにダウンロードし、ファイル パスを返します。

read_blob_as_bytes(ブロブ列)

ファイルをローカル ディスクにダウンロードし、データをバイト配列として返します。

ファイルアクセス UDF の設定

ファイル アクセス UDF を設定するには、ダウンストリーム パイプラインに次のセルを追加します。

Python
# DO NOT MODIFY this cell.

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType

# Copy to local disk and get file path.

def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname

read_blob_as_file = udf(copy_to_disk)

# Get bytes directly.

def get_bytes(blob) -> bytes:
return blob.inline_content

read_blob_as_bytes = udf(get_bytes, BinaryType())

ファイルアクセスの例

ファイルパスを返すには:

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)

# Chain your UDF with the file access UDF for the file path.

df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()

データをバイト配列として返すには、次のようにします。

Python
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)

# Chain your UDF with the file access UDF for the byte array.

df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
注記

ファイル アクセス UDF は、100 MB を超えるファイルのファイルコンテンツを処理できません。ファイルアクセス UDF を使用する前に、これらの行を除外する必要があります。

ファイル・パスはローカル・ディスクに書き込まれ UDF ため、シングル・ユーザー・クラスターでのみ機能します。 代わりに クラシッククラスター または サーバレス コンピュートでダウンストリーム パイプラインを実行する場合は、ローカル ディスクではなくUnity Catalogボリュームに書き込むようにUDFを更新できます。ただし、これによりパフォーマンスが低下します。

ボリュームに書き込むには、次のようにします。

Python
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct


# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"


fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname


read_blob_as_file = udf(copy_to_disk)