ダウンストリームRAGの使用例
プレビュー
Microsoft SharePoint コネクタは ベータ版です。
SharePoint パイプラインを作成したので、生のドキュメントをテキストに解析したり、解析されたデータをチャンク化したり、チャンクから埋め込みを作成したりできます。その後、出力テーブルで readStream
をダウンストリーム パイプラインで直接使用できます。
ファイル データにアクセスするために、次のファイル アクセス UDF を提供しています。これらの UDF は、インジェスト パイプラインからの出力テーブルで実行できます。
名前 | 説明 |
---|---|
| ファイルをローカル ディスクにダウンロードし、ファイル パスを返します。 |
| ファイルをローカル ディスクにダウンロードし、データをバイト配列として返します。 |
ファイルアクセス UDF の設定
次のセルをダウンストリーム パイプラインに追加します。
# DO NOT MODIFY this cell.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType
# Copy to local disk and get file path.
def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)
# Get bytes directly.
def get_bytes(blob) -> bytes:
return blob.inline_content
read_blob_as_bytes = udf(get_bytes, BinaryType())
例
ファイルパスを返すには:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)
# Chain your UDF with the file access UDF for the file path.
df.withColumn("text_content", file_bytes_to_text_udf(read_blob_as_file("content", "file_metadata.name"))).collect()
データをバイト配列として返すには、次のようにします。
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)
# Chain your UDF with the file access UDF for the byte array.
df.withColumn("text_content", bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
ファイル アクセス UDF は、100 MB を超えるファイルのファイルコンテンツを処理できません。ファイルアクセス UDF を使用する前に、これらの行を除外する必要があります。
ファイル・パスはローカル・ディスクに書き込まれ UDF ため、シングル・ユーザー・クラスターでのみ機能します。 代わりに クラシッククラスター または サーバレス コンピュートでダウンストリーム パイプラインを実行する場合は、ローカル ディスクではなくUnity Catalogボリュームに書き込むようにUDFを更新できます。ただし、これによりパフォーマンスが低下します。
ボリュームに書き込むには、次のようにします。
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct
# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"
fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)