メインコンテンツまでスキップ

シンクを使用して DLT でレコードを外部サービスにストリームする

備考

プレビュー

DLT sink API は パブリック プレビュー段階です。

この記事では、DLT sink API と、 それをフロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。外部データ シンクには、Unity Catalog のマネージド テーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。

注記

DLT sink API は Python でのみ使用できます。

DLTシンクとは?

DLT シンクは、DLT フローのターゲットです。By デフォルト DLT フローは、ストリーミングテーブルまたはマテリアライズドビュー ターゲットにデータを出力します。 これらは両方とも Databricks マネージド Delta テーブルです。DLT シンクは、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog によって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。シンクを使用すると、DLT パイプラインの出力を保持するためのオプションが増えました。

DLTシンクはいつ使用すべきですか?

Databricks では、次の必要がある場合は DLT シンクを使用することをお勧めします。

  • 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
  • DLT フローから変換されたデータを、Unity Catalog のマネージド テーブルや外部テーブルなど、外部の Delta インスタンスによって管理されるテーブルに書き込みます。
  • トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。

DLTシンクの使用方法を教えてください。

イベント・データがストリーミング・ソースから DLT パイプラインに取り込まれるとき、 DLT 機能を使用してこのデータを処理および調整し、アペンド・フロー処理を使用して、変換されたデータ・レコードを DLT シンクにストリームします。 このシンクは、 create_sink() 関数を使用して作成します。create_sink 関数の詳細については、シンク API リファレンスを参照してください。

ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備する DLT パイプラインがある場合は、DLT シンクを使用する準備ができています。

DLT シンクの実装は、次の 2 つの手順で構成されます。

  1. DLT シンクを作成します。
  2. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

DLT シンクの作成

Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。

  • Delta テーブル シンク (Unity Catalog のマネージド テーブルと外部テーブルを含む)
  • Apache Kafka シンク
  • Azure Event Hubs シンク

Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。

To create a Delta sink by file path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

To create a Delta sink by table name using a fully qualified catalog and schema path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

create_sink 関数の使用の詳細については、シンク API リファレンスを参照してください。

シンクが作成されたら、処理されたレコードをシンクにストリーミングできます。

追加フローを使用した DLT シンクへの書き込み

シンクを作成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。これを行うには、シンクを append_flow デコレータのtarget値として指定します。

  • Unity Catalog の管理テーブルと外部テーブルの場合は、 delta 形式を使用し、オプションでパスまたはテーブル名を指定します。 DLT パイプラインは、Unity Catalog を使用するように構成する必要があります。
  • Apache Kafka トピックの場合は、 kafka の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka構造化ストリーミング ライターの構成を参照してください。
  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。

以下は、DLT パイプラインによって処理されるレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。

Python
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

append_flowデコレーターの詳細については、「複数のフローを使用して 1 つのターゲットに書き込む」を参照してください。

制限

  • Python API のみがサポートされています。 SQL はサポートされていません。

  • ストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。 apply_changesなどの他のフローはサポートされておらず、DLT データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は <catalog>.<schema>.<table> の形式である必要があります。 Hive metastoreの場合、<schema>.<table>形式である必要があります。

  • 完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • DLT のエクスペクテーションはサポートされていません。

リソース