Delta Sharing オープン共有を使用して共有されたデータの読み取り (受信者用)
この記事では、Delta Sharing オープン共有 プロトコルを使用して共有されたデータを読み取る方法について説明します。 これには、 Databricks、 Apache Spark、 Pandas、 Power BI、および Tableauを使用して共有データを読み取る手順が含まれています。
オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効で、プロバイダーが引き続きデータを共有する限り、アクセスは保持されます。 プロバイダーは、資格情報の有効期限とローテーションを管理します。 データの更新は、ほぼリアルタイムで利用できます。 共有データの読み取りとコピーの作成はできますが、ソース データを変更することはできません。
Databricks-to-Databricks Delta Sharingを使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要ありません。また、この記事は適用されません。手順については、「Databricks-to-Databricks Delta Sharingを使用して共有されたデータを読み取る (受信者向け)」を参照してください。
次のセクションでは、 Databricks、 Apache Spark、 Pandas、および Power BI を使用して、資格情報ファイルを使用して共有データにアクセスし、読み取る方法について説明します。 Delta Sharingコネクタの完全な一覧と、その使用方法については、Delta Sharing オープンソースのドキュメントを参照してください。共有データへのアクセスで問題が発生した場合は、データプロバイダーに連絡してください。
パートナー統合は、特に明記されていない限り、第三者によって提供され、お客様は、その製品およびサービスの使用のために適切なプロバイダーのアカウントを持っている必要があります。 Databricks は、このコンテンツを最新の状態に保つために最善を尽くしていますが、パートナー統合ページ上の統合やコンテンツの正確性については一切表明しません。 統合に関して、適切なプロバイダーに連絡してください。
始める前に
チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。
安全なチャンネルを使用して、そのファイルまたはファイルの場所をあなたと共有する必要があります。
Databricks: オープン共有コネクタを使用して共有データを読み取る
このセクションでは、Catalog Explorer または Python ノートブックでプロバイダーの資格情報ファイルを使用してプロバイダーをインポートする方法について説明します。 ノートブックの手順では、ノートブックを使用して共有テーブルを一覧表示および読み取る方法についても説明します。 Catalog Explorer を使用してインポートされた共有データを読み取るには、「Databricks-to-Databricks Delta Sharing を使用して共有データを読み取る (受信者向け)」を参照してください。
データ プロバイダがDatabricks-to-Databricks共有を使用していて、資格情報ファイルを共有していない場合は、Unity Catalogを使用してデータにアクセスする必要があります。手順については、「Databricks-to-Databricks Delta Sharingを使用して共有されたデータを読み取る (受信者向け)」を参照してください。
- Catalog Explorer
- Python
Permissions required: A metastore admin or a user who has both the CREATE PROVIDER
and USE PROVIDER
privileges for your Unity Catalog metastore.
-
In your Databricks workspace, click
Catalog to open Catalog Explorer.
-
At the top of the Catalog pane, click the
gear icon and select Delta Sharing.
Alternatively, from the Quick access page, click the Delta Sharing > button.
-
On the Shared with me tab, at the top right corner of the page click Import provider directly.
-
Select a provider from the drop-down menu or enter a name.
-
Upload the credential file that the provider shared with you. You can read the specific instructions for different providers:
-
(Optional) Enter a comment.
-
Click Import.
-
Create catalogs from the shared data.
The process is the same as in the Databricks-to-Databricks sharing model.
-
Grant access to the catalogs.
See How do I make shared data available to my team? and Manage permissions for the schemas, tables, and volumes in a Delta Sharing catalog.
-
Read the shared data objects just like you would any data object that is registered in Unity Catalog.
For details and examples, see Access data in a shared table or volume.
This section describes how to use an open sharing connector to access shared data using a notebook in your Databricks workspace. You or another member of your team store the credential file in Databricks, then you use it to authenticate to the data provider’s Databricks account and read the data that the data provider shared with you.
First use a Python notebook in Databricks to store the credential file so that users on your team can access shared data.
-
In a text editor, open the credential file.
-
In your Databricks workspace, click New > Notebook.
- Enter a name.
- Set the default language for the notebook to Python.
- Select a cluster to attach to the notebook.
- Click Create.
The notebook opens in the notebook editor.
-
To use Python or pandas to access the shared data, install the delta-sharing Python connector. In the notebook editor, paste the following command:
%sh pip install delta-sharing
-
Run the cell.
The
delta-sharing
Python library gets installed in the cluster if it isn’t already installed. -
In a new cell, paste the following command, which uploads the contents of the credential file to a folder in DBFS. Replace the variables as follows:
-
<dbfs-path>
: the path to the folder where you want to save the credential file -
<credential-file-contents>
: the contents of the credential file. This is not a path to the file, but the copied contents of the file.The credential file contains JSON that defines three fields:
shareCredentialsVersion
,endpoint
, andbearerToken
.%scala
dbutils.fs.put("<dbfs-path>/config.share","""
<credential-file-contents>
""")
-
-
Run the cell.
After the credential file is uploaded, you can delete this cell. All workspace users can read the credential file from DBFS, and the credential file is available in DBFS on all clusters and SQL warehouses in your workspace. To delete the cell, click x in the cell actions menu
at the far right.
Now that the credential file is stored, you can use a notebook to list and read shared tables
-
Using Python, list the tables in the share.
In a new cell, paste the following command. Replace
<dbfs-path>
with the path that was created in above.When the code runs, Python reads the credential file from DBFS on the cluster. Access data stored in DBFS at the path
/dbfs/
.Pythonimport delta_sharing
client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
client.list_all_tables() -
Run the cell.
The result is an array of tables, along with metadata for each table. The following output shows two tables:
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
If the output is empty or doesn’t contain the tables you expect, contact the data provider.
-
Query a shared table.
-
Using Scala:
In a new cell, paste the following command. When the code runs, the credential file is read from DBFS through the JVM.
Replace the variables as follows:
<profile-path>
: the DBFS path of the credential file. For example,/<dbfs-path>/config.share
.<share-name>
: the value ofshare=
for the table.<schema-name>
: the value ofschema=
for the table.<table-name>
: the value ofname=
for the table.
%scala
spark.read.format("deltaSharing")
.load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);Run the cell. Each time you load the shared table, you see fresh data from the source.
-
Using SQL:
To query the data using SQL, you create a local table in the workspace from the shared table, then query the local table. The shared data is not stored or cached in the local table. Each time you query the local table, you see the current state of the shared data.
In a new cell, paste the following command.
Replace the variables as follows:
<local-table-name>
: the name of the local table.<profile-path>
: the location of the credential file.<share-name>
: the value ofshare=
for the table.<schema-name>
: the value ofschema=
for the table.<table-name>
: the value ofname=
for the table.
%sql
DROP TABLE IF EXISTS table_name;
CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
SELECT * FROM <local-table-name> LIMIT 10;When you run the command, the shared data is queried directly. As a test, the table is queried and the first 10 results are returned.
If the output is empty or doesn’t contain the data you expect, contact the data provider.
-
Apache Spark: 共有データの読み取り
Spark 3.x 以降を使用して共有データにアクセスするには、次の手順に従います。
これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。
Delta Sharing Python コネクタと Spark コネクタをインストールする
共有データに関連するメタデータ (共有されているテーブルのリストなど) にアクセスするには、次の操作を行います。 この例では Python を使用しています。
-
delta-sharing Python コネクタをインストールします。
Bashpip install delta-sharing
-
Apache Spark コネクタをインストールします。
Spark を使用して共有テーブルを一覧表示する
共有内のテーブルを一覧表示します。 次の例では、 <profile-path>
を資格情報ファイルの場所に置き換えます。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。
Spark を使用して共有データにアクセスする
次のコマンドを実行して、これらの変数を置き換えます。
<profile-path>
: 認証情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。<version-as-of>
:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。delta-sharing-spark
0.5.0 以上が必要です。<timestamp-as-of>
:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上のdelta-sharing-spark
が必要です。
- Python
- Scala
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)
spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)
spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
Run the following, replacing these variables:
<profile-path>
: the location of the credential file.<share-name>
: the value ofshare=
for the table.<schema-name>
: the value ofschema=
for the table.<table-name>
: the value ofname=
for the table.<version-as-of>
: optional. The version of the table to load the data. Only works if the data provider shares the history of the table. Requiresdelta-sharing-spark
0.5.0 or above.<timestamp-as-of>
: optional. Load the data at the version before or at the given timestamp. Only works if the data provider shares the history of the table. Requiresdelta-sharing-spark
0.6.0 or above.
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
Sparkを使用して共有チェンジデータフィードにアクセスします
テーブル履歴が共有されており、ソース テーブルでチェンジデータフィード (CDF) が有効になっている場合は、これらの変数を置き換えて次を実行してチェンジデータフィードにアクセスできます。 delta-sharing-spark
0.5.0 以降が必要です。
開始パラメーターは 1 つだけ指定する必要があります。
<profile-path>
: 認証情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。<starting-version>
:オプション。 クエリーの開始バージョン。 長整数型として指定します。<ending-version>
:オプション。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。<starting-timestamp>
:オプション。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。yyyy-mm-dd hh:mm:ss[.fffffffff]
の形式の文字列として指定します。<ending-timestamp>
:オプション。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。yyyy-mm-dd hh:mm:ss[.fffffffff]
- Python
- Scala
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。
Spark 構造化ストリーミングを使用した共有テーブルへのアクセス
テーブル履歴が共有されている場合は、共有データをストリームで読み取ることができます。 delta-sharing-spark
0.6.0 以降が必要です。
サポートされているオプション:
ignoreDeletes
: データを削除するトランザクションは無視してください。ignoreChanges
:UPDATE
、MERGE INTO
、DELETE
(パーティション内)、OVERWRITE
などのデータ変更操作によってソース テーブル内のファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームの消費者は重複を処理できる必要があります。 削除はダウンストリームに反映されません。ignoreChanges
はignoreDeletes
を包含します。 したがって、ignoreChanges
を使用する場合、ソーステーブルの削除や更新によってストリームが中断されることはありません。startingVersion
: 開始する共有テーブルのバージョン。 このバージョン (包括的) 以降のすべてのテーブル変更は、ストリーミング ソースによって読み取られます。startingTimestamp
: 開始するタイムスタンプ。 タイムスタンプ (この値を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取られます。 例:"2023-01-01 00:00:00.0"
。maxFilesPerTrigger
: すべてのマイクロバッチで考慮される新しいファイルの数。maxBytesPerTrigger
: 各マイクロバッチで処理されるデータの量。 このオプションは、バッチがほぼこの量のデータを処理し、最小入力単位がこの制限より大きい場合にストリーミングクエリを先に進めるために制限を超えて処理する可能性があることを意味します。readChangeFeed
: ストリーム 共有テーブルのチェンジデータフィードを読み取ります。
サポートされていないオプション:
Trigger.availableNow
サンプル構造化ストリーミング クエリ
- Scala
- Python
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
「Databricks でのストリーミング」も参照してください。
削除ベクトルまたは列マッピングが有効になっているテーブルの読み取り
プレビュー
この機能は パブリック プレビュー段階です。
削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 削除ベクトルとはを参照してください。
Databricks では、Delta テーブルの列マッピングもサポートされています。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。
プロバイダーが削除ベクトルまたは列マッピングが有効になっているテーブルを共有している場合は、 delta-sharing-spark
3.1 以降で実行されているコンピュートを使用してテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。CDF クエリとストリーミング クエリには、Databricks Runtime 14.2 以降が必要です。
バッチクエリは、共有テーブルのテーブル機能に基づいて responseFormat
を自動的に解決できるため、そのまま実行できます。
チェンジデータフィード (CDF) を読み取り、削除ベクトルまたはカラムマッピングが有効になっている共有テーブルに対してストリーミングクエリを実行するには、追加オプション responseFormat=delta
を設定する必要があります。
次の例は、バッチクエリ、CDF、ストリーミングクエリを示しています。
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch query
spark.read.format("deltaSharing").load(tablePath)
// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)
// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
Pandas: 共有データの読み取り
pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。
これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。
Delta Sharing Python コネクタをインストールする
共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 delta-sharing Python コネクタをインストールする必要があります。
pip install delta-sharing
Pandas を使用した共有テーブルの一覧表示
共有内のテーブルを一覧表示するには、次のコマンドを実行して、 <profile-path>/config.share
を資格情報ファイルの場所に置き換えます。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。
Pandas を使用して共有データにアクセスする
を使用してPandas Pythonの共有データにアクセスするには、次のように変数を置き換えて、次のコマンドを実行します。
<profile-path>
: 認証情報ファイルの場所。<share-name>
: テーブルのshare=
の値。<schema-name>
: テーブルのschema=
の値。<table-name>
: テーブルのname=
の値。
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")
Pandasを使用して共有チェンジデータフィードにアクセスする
Pandas で共有テーブルのチェンジデータフィードにアクセスするにはPython次のように変数を置き換えて実行します。チェンジデータフィードは、データプロバイダーがテーブルのチェンジデータフィードを共有したかどうかによって使用できない場合があります。
<starting-version>
:オプション。 クエリーの開始バージョン。<ending-version>
:オプション。 クエリの終了バージョン。<starting-timestamp>
:オプション。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。<ending-timestamp>
:オプション。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。
Power BI: 共有データの読み取り
Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。
必要条件
- Power BI Desktop 2.99.621.0 以降。
- データプロバイダーが共有した認証情報ファイルへのアクセス。 「オープン共有モデルでアクセスを取得する」を参照してください。
Databricks に接続する
Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。
- テキスト エディターで共有資格情報ファイルを開き、エンドポイント URL とトークンを取得します。
- Power BI Desktop を開きます。
- [ データの取得 ] メニューで、 Delta Sharing を検索します。
- コネクタを選択し、[ 接続 ] をクリックします。
- 資格情報ファイルからコピーしたエンドポイント URL を [Delta Sharing Server URL ] フィールドに入力します。
- 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトで 100 万行に設定されます。
- 「 OK 」をクリックします。
- [認証 ] で、認証情報ファイルから取得したトークンを Bearer トークン にコピーします。
- 「 接続 」をクリックします。
Power BI Delta Sharing コネクタの制限事項
Power BI Delta Sharing コネクタには、次の制限があります。
- コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した [行制限 ] に制限します。
Tableau: 共有データの読み取り
Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。
必要条件
- Tableau Desktop および Tableau Server 2024.1 以降
- データプロバイダーが共有した認証情報ファイルへのアクセス。 「オープン共有モデルでアクセスを取得する」を参照してください。
Databricks に接続する
Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。
- Tableau Exchange に移動し、指示に従って Delta Sharing Connector をダウンロードし、適切なデスクトップ フォルダーに配置します。
- Tableau Desktop を開きます。
- [コネクタ] ページで、"Delta Sharing by Databricks" を検索します。
- [ 共有ファイルのアップロード ] を選択し、プロバイダーによって共有された資格情報ファイルを選択します。
- [ データを取得 ] をクリックします。
- Data Explorer で、テーブルを選択します。
- 必要に応じて、SQL フィルターまたは行制限を追加します。
- 「 テーブル・データの取得 」をクリックします。
Tableau Delta Sharing コネクタの制限事項
Tableau Delta Sharing コネクタには、次の制限があります。
- コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。
- すべての列はタイプ
String
として返されます。 - SQL フィルターは、Delta Sharing サーバーが predicateHint をサポートしている場合にのみ機能します。
新しい資格情報をリクエストする
資格情報のアクティブ化 URL またはダウンロードした資格情報が紛失、破損、または侵害された場合、または資格情報の有効期限が切れてもプロバイダーから新しい資格情報が送られてこない場合は、プロバイダーに連絡して新しい資格情報をリクエストしてください。