Apache Spark DataFrames を使用した Delta Sharing 共有テーブルの読み取り
この記事では、Apache Spark を使用して 、Delta Sharing を使用して共有データのクエリを実行する構文の例を示します。 deltasharing
キーワードを DataFrame 操作の形式オプションとして使用します。
共有データのクエリに関するその他のオプション
また、次の例のように、メタストアに登録されている Delta Sharing カタログで共有テーブル名を使用するクエリを作成することもできます。
- SQL
- Python
SELECT * FROM shared_table_name
spark.read.table("shared_table_name")
Delta SharingDatabricksでの の設定と共有テーブル名を使用したデータのクエリの詳細については、「 を使用して共有されたデータの読み取りDatabricks-to-DatabricksDelta Sharing(受信者向け)」 を参照してください。
構造化ストリーミングを使用して、共有テーブルのレコードを増分的に処理できます。 構造化ストリーミングを使用するには、テーブルの履歴共有を有効にする必要があります。 ALTER SHAREを参照してください。履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。
共有テーブルでソース Delta テーブルでチェンジデータフィードが有効になっており、共有で履歴が有効になっている場合は、構造化ストリーミングまたはバッチ操作で Delta 共有を読み取るときにチェンジデータフィードを使用できます。 でのDelta Lake チェンジデータフィードの使用Databricks を参照してください。
Delta Sharing format キーワードで読み取ります
deltasharing
キーワードは、次の例に示すように、Apache Spark DataFrame の読み取り操作でサポートされています。
df = (spark.read
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
Read チェンジデータフィード for Delta Sharing shared tables
履歴が共有され、チェンジデータフィードが有効になっているテーブルの場合、 Apache Spark DataFramesを使用してチェンジデータフィードレコードを読み取ることができます。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。
df = (spark.read
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
構造化ストリーミングを使用した Delta Sharing 共有テーブルの読み取り
履歴が共有されているテーブルの場合、共有テーブルを構造化ストリーミングのソースとして使用できます。 履歴共有には、Databricks Runtime 12.2 LTS 以降が必要です。
streaming_df = (spark.readStream
.format("deltasharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)
# If CDF is enabled on the source table
streaming_cdf_df = (spark.readStream
.format("deltasharing")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
)