メインコンテンツまでスキップ

Delta Sharing を使用して共有されたデータの読み取り、ベアラー トークンとのオープン共有 (受信者用)

このページでは、Delta Sharing オープン共有 プロトコルとベアラー トークンを使用して、共有されたデータを読み取る方法について説明します。これには、次のツールを使用して共有データを読み取る手順が含まれています。

  • Databricks
  • Apache Spark
  • Pandas
  • Power BI
  • Tableau

このオープンな共有モデルでは、データ プロバイダーによってチームのメンバーと共有される資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。資格情報が有効で、プロバイダーが引き続きデータを共有する限り、アクセスは保持されます。プロバイダーは、資格情報の有効期限とローテーションを管理します。データの更新は、ほぼリアルタイムで利用できます。共有データの読み取りとコピーの作成はできますが、ソース データを変更することはできません。

注記

Databricks-to-Databricks Delta Sharingを使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要ありません。また、この記事は適用されません。手順については、「Databricks-to-Databricks Delta Sharingを使用して共有されたデータを読み取る (受信者向け)」を参照してください。

次のセクションでは、 Databricks、 Apache Spark、 Pandas、および Power BI を使用して、資格情報ファイルを使用して共有データにアクセスし、読み取る方法について説明します。 Delta Sharingコネクタの完全な一覧と、その使用方法については、Delta Sharing オープンソースのドキュメントを参照してください。共有データへのアクセスで問題が発生した場合は、データプロバイダーに連絡してください。

始める前に

チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。

安全なチャンネルを使用して、そのファイルまたはファイルの場所をあなたと共有する必要があります。

Databricks: オープン共有コネクタを使用して共有データを読み取る

このセクションでは、プロバイダーをインポートする方法と、Catalog Explorer または Python ノートブックで共有データをクエリする方法について説明します。

  • Databricks ワークスペースが Unity Catalog に対して有効になっている場合は、Catalog Explorer でインポート プロバイダー UI を使用します。資格情報ファイルを保存または指定することなく、次の操作を実行できます。

    • ボタンをクリックするだけで、共有からカタログを作成できます。
    • Unity Catalog のアクセス制御を使用して、共有テーブルへのアクセスを許可します。
    • 標準の Unity Catalog 構文を使用して、共有データのクエリを実行します。
  • Databricks ワークスペースで Unity Catalog が有効になっていない場合は、例として Python ノートブックの手順を使用します。

必要なアクセス許可 : メタストア管理者、または Unity Catalog メタストアに対する CREATE PROVIDER 権限と USE PROVIDER 権限の両方を持つユーザー。

  1. Databricks ワークスペースで、カタログアイコン[カタログ]をクリックして カタログエクスプローラー を開きます。

  2. [カタログ ] ウィンドウの上部にある [歯車アイコン] 歯車アイコンをクリックし、[ Delta Sharing ] を選択します。

    または、[ クイック アクセス ] ページで [Delta Sharing > ] ボタンをクリックします。

  3. [ 自分と共有] タブで、[ データのインポート ] をクリックします。

  4. プロバイダー名を入力します。

    名前にスペースを含めることはできません。

  5. プロバイダーから共有された認証情報ファイルをアップロードします。

    多くのプロバイダーは、共有を受け取ることができる独自の Delta Sharing ネットワークを持っています。 詳細については、「 プロバイダー固有の構成」を参照してください。

  6. (オプション)コメントを入力します。

    プロバイダーの資格情報ファイルをプロバイダーから直接インポートする

  7. インポート 」をクリックします。

  8. 共有データからカタログを作成します。

    [共有] タブで、共有行の [カタログの作成 ] をクリックします。

    SQL または Databricks CLI を使用して共有からカタログを作成する方法については、「共有からカタログを作成する」を参照してください。

  9. カタログへのアクセス権を付与します。

    共有データをチームで利用できるようにするにはどうすればよいですか?」を参照してください。Delta Sharingカタログ内のスキーマ、テーブル、ボリュームのアクセス許可の管理

  10. 共有データ オブジェクトは、Unity Catalog に登録されているデータ オブジェクトと同様に読み取ります。

    詳細と例については、 共有テーブルまたはボリューム内のデータへのアクセスを参照してください。

Apache Spark: 共有データの読み取り

Spark 3.x 以降を使用して共有データにアクセスするには、次の手順に従います。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

注記

Unity Catalog が有効になっている Databricks ワークスペースで Spark を使用していて、インポート プロバイダー UI を使用してプロバイダーと共有を行った場合、このセクションの手順は適用されません。 共有テーブルには、Unity Catalog に登録されている他のテーブルと同じようにアクセスできます。 delta-sharing Python コネクタをインストールしたり、資格情報ファイルへのパスを指定したりする必要はありません。「Databricks: オープン共有コネクタを使用して共有データを読み取る」を参照してください。

Delta Sharing Python コネクタと Spark コネクタをインストールする

共有データに関連するメタデータ (共有されているテーブルのリストなど) にアクセスするには、次の操作を行います。 この例では Python を使用しています。

  1. delta-sharing Python コネクタをインストールします。

    Bash
    pip install delta-sharing
  2. Apache Spark コネクタをインストールします。

Spark を使用して共有テーブルを一覧表示する

共有内のテーブルを一覧表示します。 次の例では、 <profile-path> を資格情報ファイルの場所に置き換えます。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

Console
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark を使用して共有データにアクセスする

次のコマンドを実行して、これらの変数を置き換えます。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <version-as-of>:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。
  • <timestamp-as-of>:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。
Python
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Sparkを使用して共有チェンジデータフィードにアクセスします

テーブル履歴が共有されており、ソース テーブルでチェンジデータフィード (CDF) が有効になっている場合は、これらの変数を置き換えて次を実行してチェンジデータフィードにアクセスできます。 delta-sharing-spark 0.5.0 以降が必要です。

開始パラメーターは 1 つだけ指定する必要があります。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <starting-version>:オプション。 クエリーの開始バージョン。 長整数型として指定します。
  • <ending-version>:オプション。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。
  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff]の形式の文字列として指定します。
  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。 yyyy-mm-dd hh:mm:ss[.fffffffff]
Python
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。

Spark 構造化ストリーミングを使用した共有テーブルへのアクセス

テーブル履歴が共有されている場合は、共有データをストリームで読み取ることができます。 delta-sharing-spark 0.6.0 以降が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションは無視してください。
  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、 OVERWRITEなどのデータ変更操作によってソース テーブル内のファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームの消費者は重複を処理できる必要があります。 削除はダウンストリームに反映されません。 ignoreChangesignoreDeletesを包含します。 したがって、 ignoreChangesを使用する場合、ソーステーブルの削除や更新によってストリームが中断されることはありません。
  • startingVersion: 開始する共有テーブルのバージョン。 このバージョン (包括的) 以降のすべてのテーブル変更は、ストリーミング ソースによって読み取られます。
  • startingTimestamp: 開始するタイムスタンプ。 タイムスタンプ (この値を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取られます。 例: "2023-01-01 00:00:00.0"
  • maxFilesPerTrigger: すべてのマイクロバッチで考慮される新しいファイルの数。
  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションは、バッチがほぼこの量のデータを処理し、最小入力単位がこの制限より大きい場合にストリーミングクエリを先に進めるために制限を超えて処理する可能性があることを意味します。
  • readChangeFeed: ストリーム 共有テーブルのチェンジデータフィードを読み取ります。

サポートされていないオプション:

  • Trigger.availableNow

サンプル構造化ストリーミング クエリ

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

構造化ストリーミングの概念も参照してください。

削除ベクトルまたは列マッピングが有効になっているテーブルの読み取り

備考

プレビュー

この機能は パブリック プレビュー段階です。

削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 削除ベクトルとはを参照してください

Databricks では、Delta テーブルの列マッピングもサポートされています。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

プロバイダーが削除ベクトルまたは列マッピングが有効になっているテーブルを共有している場合は、 delta-sharing-spark 3.1 以降で実行されているコンピュートを使用してテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。CDF クエリとストリーミング クエリには、Databricks Runtime 14.2 以降が必要です。

バッチクエリは、共有テーブルのテーブル機能に基づいて responseFormat を自動的に解決できるため、そのまま実行できます。

チェンジデータフィード (CDF) を読み取り、削除ベクトルまたはカラムマッピングが有効になっている共有テーブルに対してストリーミングクエリを実行するには、追加オプション responseFormat=deltaを設定する必要があります。

次の例は、バッチクエリ、CDF、ストリーミングクエリを示しています。

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データの読み取り

pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

注記

Pandasが有効になっているDatabricks ワークスペースでUnity Catalog を使用しており、インポート プロバイダー UI を使用してプロバイダーをインポートして共有した場合、このセクションの手順は適用されません。共有テーブルには、Unity Catalog に登録されている他のテーブルと同じようにアクセスできます。 delta-sharing Python コネクタをインストールしたり、資格情報ファイルへのパスを指定したりする必要はありません。「Databricks: オープン共有コネクタを使用して共有データを読み取る」を参照してください。

Delta Sharing Python コネクタをインストールする

共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 delta-sharing Python コネクタをインストールする必要があります。

Bash
pip install delta-sharing

Pandas を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示するには、次のコマンドを実行して、 <profile-path>/config.share を資格情報ファイルの場所に置き換えます。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Pandas を使用して共有データにアクセスする

を使用してPandas Pythonの共有データにアクセスするには、次のように変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
Python
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Pandasを使用して共有チェンジデータフィードにアクセスする

Pandas で共有テーブルのチェンジデータフィードにアクセスするにはPython次のように変数を置き換えて実行します。チェンジデータフィードは、データプロバイダーがテーブルのチェンジデータフィードを共有したかどうかによって使用できない場合があります。

  • <starting-version>:オプション。 クエリーの開始バージョン。
  • <ending-version>:オプション。 クエリの終了バージョン。
  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。
  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。
Python
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)

出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。

Power BI: 共有データの読み取り

Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

必要条件

Databricks に接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。

  1. テキスト エディターで共有資格情報ファイルを開き、エンドポイント URL とトークンを取得します。
  2. Power BI Desktop を開きます。
  3. [ データの取得 ] メニューで、 Delta Sharing を検索します。
  4. コネクタを選択し、[ 接続 ] をクリックします。
  5. 資格情報ファイルからコピーしたエンドポイント URL を [Delta Sharing Server URL ] フィールドに入力します。
  6. 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトで 100 万行に設定されます。
  7. OK 」をクリックします。
  8. [認証 ] で、認証情報ファイルから取得したトークンを Bearer トークン にコピーします。
  9. 接続 」をクリックします。

Power BI Delta Sharing コネクタの制限事項

Power BI Delta Sharing コネクタには、次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した [行制限 ] に制限します。

Tableau: 共有データの読み取り

Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

必要条件

Databricks に接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。

  1. Tableau Exchange に移動し、指示に従って Delta Sharing Connector をダウンロードし、適切なデスクトップ フォルダーに配置します。
  2. Tableau Desktop を開きます。
  3. [コネクタ] ページで、"Delta Sharing by Databricks" を検索します。
  4. [ 共有ファイルのアップロード ] を選択し、プロバイダーによって共有された資格情報ファイルを選択します。
  5. [ データを取得 ] をクリックします。
  6. Data Explorer で、テーブルを選択します。
  7. 必要に応じて、SQL フィルターまたは行制限を追加します。
  8. テーブル・データの取得 」をクリックします。

Tableau Delta Sharing コネクタの制限事項

Tableau Delta Sharing コネクタには、次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。
  • すべての列はタイプ String として返されます。
  • SQL フィルターは、Delta Sharing サーバーが predicateHint をサポートしている場合にのみ機能します。
  • 削除ベクトルはサポートされていません。

新しい資格情報をリクエストする

資格情報のアクティブ化 URL またはダウンロードした資格情報が紛失、破損、または侵害された場合、または資格情報の有効期限が切れてもプロバイダーから新しい資格情報が送られてこない場合は、プロバイダーに連絡して新しい資格情報をリクエストしてください。