メインコンテンツまでスキップ

Delta Sharing オープン共有を使用して共有されたデータの読み取り (受信者用)

この記事では、Delta Sharing オープン共有 プロトコルを使用して共有されたデータを読み取る方法について説明します。 これには、 Databricks、 Apache Spark、 Pandas、 Power BI、および Tableauを使用して共有データを読み取る手順が含まれています。

オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効で、プロバイダーが引き続きデータを共有する限り、アクセスは保持されます。 プロバイダーは、資格情報の有効期限とローテーションを管理します。 データの更新は、ほぼリアルタイムで利用できます。 共有データの読み取りとコピーの作成はできますが、ソース データを変更することはできません。

注記

Databricks-to-Databricks Delta Sharingを使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要ありません。また、この記事は適用されません。手順については、「Databricks-to-Databricks Delta Sharingを使用して共有されたデータを読み取る (受信者向け)」を参照してください。

次のセクションでは、 Databricks、 Apache Spark、 Pandas、および Power BI を使用して、資格情報ファイルを使用して共有データにアクセスし、読み取る方法について説明します。 Delta Sharingコネクタの完全な一覧と、その使用方法については、Delta Sharing オープンソースのドキュメントを参照してください。共有データへのアクセスで問題が発生した場合は、データプロバイダーに連絡してください。

注記

パートナー統合は、特に明記されていない限り、第三者によって提供され、お客様は、その製品およびサービスの使用のために適切なプロバイダーのアカウントを持っている必要があります。 Databricks は、このコンテンツを最新の状態に保つために最善を尽くしていますが、パートナー統合ページ上の統合やコンテンツの正確性については一切表明しません。 統合に関して、適切なプロバイダーに連絡してください。

始める前に

チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。

安全なチャンネルを使用して、そのファイルまたはファイルの場所をあなたと共有する必要があります。

Databricks: オープン共有コネクタを使用して共有データを読み取る

このセクションでは、Catalog Explorer または Python ノートブックでプロバイダーの資格情報ファイルを使用してプロバイダーをインポートする方法について説明します。 ノートブックの手順では、ノートブックを使用して共有テーブルを一覧表示および読み取る方法についても説明します。 Catalog Explorer を使用してインポートされた共有データを読み取るには、「Databricks-to-Databricks Delta Sharing を使用して共有データを読み取る (受信者向け)」を参照してください。

注記

データ プロバイダがDatabricks-to-Databricks共有を使用していて、資格情報ファイルを共有していない場合は、Unity Catalogを使用してデータにアクセスする必要があります。手順については、「Databricks-to-Databricks Delta Sharingを使用して共有されたデータを読み取る (受信者向け)」を参照してください。

Permissions required: A metastore admin or a user who has both the CREATE PROVIDER and USE PROVIDER privileges for your Unity Catalog metastore.

  1. In your Databricks workspace, click Catalog icon Catalog to open Catalog Explorer.

  2. At the top of the Catalog pane, click the Gear icon gear icon and select Delta Sharing.

    Alternatively, from the Quick access page, click the Delta Sharing > button.

  3. On the Shared with me tab, at the top right corner of the page click Import provider directly.

  4. Select a provider from the drop-down menu or enter a name.

  5. Upload the credential file that the provider shared with you. You can read the specific instructions for different providers:

  6. (Optional) Enter a comment.

    Import the provider's credential file directly from a provider.

  7. Click Import.

  8. Create catalogs from the shared data.

    The process is the same as in the Databricks-to-Databricks sharing model.

    See Create a catalog from a share.

  9. Grant access to the catalogs.

    See How do I make shared data available to my team? and Manage permissions for the schemas, tables, and volumes in a Delta Sharing catalog.

  10. Read the shared data objects just like you would any data object that is registered in Unity Catalog.

    For details and examples, see Access data in a shared table or volume.

Apache Spark: 共有データの読み取り

Spark 3.x 以降を使用して共有データにアクセスするには、次の手順に従います。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタと Spark コネクタをインストールする

共有データに関連するメタデータ (共有されているテーブルのリストなど) にアクセスするには、次の操作を行います。 この例では Python を使用しています。

  1. delta-sharing Python コネクタをインストールします。

    Bash
    pip install delta-sharing
  2. Apache Spark コネクタをインストールします。

Spark を使用して共有テーブルを一覧表示する

共有内のテーブルを一覧表示します。 次の例では、 <profile-path> を資格情報ファイルの場所に置き換えます。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

Console
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark を使用して共有データにアクセスする

次のコマンドを実行して、これらの変数を置き換えます。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <version-as-of>:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。
  • <timestamp-as-of>:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。
Python
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Sparkを使用して共有チェンジデータフィードにアクセスします

テーブル履歴が共有されており、ソース テーブルでチェンジデータフィード (CDF) が有効になっている場合は、これらの変数を置き換えて次を実行してチェンジデータフィードにアクセスできます。 delta-sharing-spark 0.5.0 以降が必要です。

開始パラメーターは 1 つだけ指定する必要があります。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
  • <starting-version>:オプション。 クエリーの開始バージョン。 長整数型として指定します。
  • <ending-version>:オプション。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。
  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff]の形式の文字列として指定します。
  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。 yyyy-mm-dd hh:mm:ss[.fffffffff]
Python
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。

Spark 構造化ストリーミングを使用した共有テーブルへのアクセス

テーブル履歴が共有されている場合は、共有データをストリームで読み取ることができます。 delta-sharing-spark 0.6.0 以降が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションは無視してください。
  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、 OVERWRITEなどのデータ変更操作によってソース テーブル内のファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームの消費者は重複を処理できる必要があります。 削除はダウンストリームに反映されません。 ignoreChangesignoreDeletesを包含します。 したがって、 ignoreChangesを使用する場合、ソーステーブルの削除や更新によってストリームが中断されることはありません。
  • startingVersion: 開始する共有テーブルのバージョン。 このバージョン (包括的) 以降のすべてのテーブル変更は、ストリーミング ソースによって読み取られます。
  • startingTimestamp: 開始するタイムスタンプ。 タイムスタンプ (この値を含む) 以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取られます。 例: "2023-01-01 00:00:00.0"
  • maxFilesPerTrigger: すべてのマイクロバッチで考慮される新しいファイルの数。
  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションは、バッチがほぼこの量のデータを処理し、最小入力単位がこの制限より大きい場合にストリーミングクエリを先に進めるために制限を超えて処理する可能性があることを意味します。
  • readChangeFeed: ストリーム 共有テーブルのチェンジデータフィードを読み取ります。

サポートされていないオプション:

  • Trigger.availableNow

サンプル構造化ストリーミング クエリ

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

「Databricks でのストリーミング」も参照してください。

削除ベクトルまたは列マッピングが有効になっているテーブルの読み取り

備考

プレビュー

この機能は パブリック プレビュー段階です。

削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 削除ベクトルとはを参照してください

Databricks では、Delta テーブルの列マッピングもサポートされています。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

プロバイダーが削除ベクトルまたは列マッピングが有効になっているテーブルを共有している場合は、 delta-sharing-spark 3.1 以降で実行されているコンピュートを使用してテーブルを読み取ることができます。 Databricks クラスターを使用している場合は、Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。CDF クエリとストリーミング クエリには、Databricks Runtime 14.2 以降が必要です。

バッチクエリは、共有テーブルのテーブル機能に基づいて responseFormat を自動的に解決できるため、そのまま実行できます。

チェンジデータフィード (CDF) を読み取り、削除ベクトルまたはカラムマッピングが有効になっている共有テーブルに対してストリーミングクエリを実行するには、追加オプション responseFormat=deltaを設定する必要があります。

次の例は、バッチクエリ、CDF、ストリーミングクエリを示しています。

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データの読み取り

pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタをインストールする

共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 delta-sharing Python コネクタをインストールする必要があります。

Bash
pip install delta-sharing

Pandas を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示するには、次のコマンドを実行して、 <profile-path>/config.share を資格情報ファイルの場所に置き換えます。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Pandas を使用して共有データにアクセスする

を使用してPandas Pythonの共有データにアクセスするには、次のように変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 認証情報ファイルの場所。
  • <share-name>: テーブルの share= の値。
  • <schema-name>: テーブルの schema= の値。
  • <table-name>: テーブルの name= の値。
Python
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Pandasを使用して共有チェンジデータフィードにアクセスする

Pandas で共有テーブルのチェンジデータフィードにアクセスするにはPython次のように変数を置き換えて実行します。チェンジデータフィードは、データプロバイダーがテーブルのチェンジデータフィードを共有したかどうかによって使用できない場合があります。

  • <starting-version>:オプション。 クエリーの開始バージョン。
  • <ending-version>:オプション。 クエリの終了バージョン。
  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。
  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。
Python
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)

出力が空の場合、または期待するデータが含まれていない場合は、データプロバイダーに連絡してください。

Power BI: 共有データの読み取り

Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

必要条件

Databricks に接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。

  1. テキスト エディターで共有資格情報ファイルを開き、エンドポイント URL とトークンを取得します。
  2. Power BI Desktop を開きます。
  3. [ データの取得 ] メニューで、 Delta Sharing を検索します。
  4. コネクタを選択し、[ 接続 ] をクリックします。
  5. 資格情報ファイルからコピーしたエンドポイント URL を [Delta Sharing Server URL ] フィールドに入力します。
  6. 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトで 100 万行に設定されます。
  7. OK 」をクリックします。
  8. [認証 ] で、認証情報ファイルから取得したトークンを Bearer トークン にコピーします。
  9. 接続 」をクリックします。

Power BI Delta Sharing コネクタの制限事項

Power BI Delta Sharing コネクタには、次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した [行制限 ] に制限します。

Tableau: 共有データの読み取り

Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

必要条件

Databricks に接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の操作を行います。

  1. Tableau Exchange に移動し、指示に従って Delta Sharing Connector をダウンロードし、適切なデスクトップ フォルダーに配置します。
  2. Tableau Desktop を開きます。
  3. [コネクタ] ページで、"Delta Sharing by Databricks" を検索します。
  4. [ 共有ファイルのアップロード ] を選択し、プロバイダーによって共有された資格情報ファイルを選択します。
  5. [ データを取得 ] をクリックします。
  6. Data Explorer で、テーブルを選択します。
  7. 必要に応じて、SQL フィルターまたは行制限を追加します。
  8. テーブル・データの取得 」をクリックします。

Tableau Delta Sharing コネクタの制限事項

Tableau Delta Sharing コネクタには、次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。
  • すべての列はタイプ String として返されます。
  • SQL フィルターは、Delta Sharing サーバーが predicateHint をサポートしている場合にのみ機能します。

新しい資格情報をリクエストする

資格情報のアクティブ化 URL またはダウンロードした資格情報が紛失、破損、または侵害された場合、または資格情報の有効期限が切れてもプロバイダーから新しい資格情報が送られてこない場合は、プロバイダーに連絡して新しい資格情報をリクエストしてください。