メインコンテンツまでスキップ

Apache Spark DataFrames を使用した OpenSharing 共有テーブルの読み取り

この記事では、Apache Spark を使用して 、OpenSharing を使用して共有データのクエリを実行する構文の例を示します。deltasharing キーワードを DataFrame 操作の形式オプションとして使用します。

共有データをクエリするためのその他のオプション

メタストアに登録されているOpenSharingカタログで、以下の例のように、共有テーブル名を使用するクエリも作成できます。

SQL
SELECT * FROM shared_table_name

DatabricksでOpenSharingを構成する方法、および共有テーブル名を使用してデータをクエリする方法の詳細については、Databricks-to-Databricks OpenSharing を使用して共有データを読み取る (受信者向け)を参照してください。

構造化ストリーミングを使用して、共有テーブルのレコードを段階的に処理できます。構造化ストリーミングを使用するには、テーブルの履歴共有を有効にする必要があります。See ALTER SHARE.履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。

共有テーブルがソース Delta テーブルでチェンジデータフィードが有効になっており、共有で履歴が有効になっている場合、構造化ストリーミングまたはバッチ操作で OpenSharing 共有を読み取る際にチェンジデータフィードを使用できます。Databricksでのチェンジデータフィードの使用 を参照してください。

オープン共有フォーマットキーワードによる読み込み

deltasharing キーワードは、次の例に示すように、Apache Spark データフレーム の読み取り操作でサポートされています。

Python
df = (spark.read
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

OpenSharing 共有テーブルのチェンジデータフィードの読み込み

履歴が共有され、チェンジデータフィードが有効になっているテーブルの場合、 Apache Spark データフレームを使用してチェンジデータフィードレコードを読み取ることができます。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。

Python
df = (spark.read
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

OpenSharing共有テーブルを構造化ストリーミングで読み取り

履歴が共有されているテーブルの場合、構造化ストリーミングのソースとして共有テーブルを使用できます。履歴共有にはDatabricks Runtime 12.2 LTS以降が必要です。

Python
streaming_df = (spark.readStream
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)

# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)