メインコンテンツまでスキップ

DLT フローによるデータの増分読み込みと処理

データは フロー を通じてDLTで処理されます。各フローは 、クエリ と、通常は ターゲット で構成されます。フローは、クエリをバッチとして、またはターゲットへのデータストリームとして増分的に処理します。フローは、Databricks の ETL パイプライン内に存在します。

通常、フローは、ターゲットを更新するクエリを DLT で作成すると自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。

最新情報

フローは、その定義パイプラインが更新されるたびに実行されます。フローは、利用可能な最新のデータを使用してテーブルを作成または更新します。フローのタイプとデータに対する変更の状態に応じて、更新では、新しいレコードのみを処理する増分更新を実行するか、データソースからすべてのレコードを再処理する完全更新を実行する場合があります。

デフォルト フローを作成する

パイプラインで DLT オブジェクトを作成するときは、通常、テーブルまたはビューと、それをサポートするクエリを定義します。たとえば、このSQLクエリでは、customers_bronzeというテーブルから読み取って、customers_silver というストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Pythonでも同じストリーミングテーブルを作成できます。Python では、通常、DLT を使用するには、データフレームを返すクエリ関数を作成し、DLT 機能にアクセスするためのデコレータを使用します。

Python
import dlt

@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")

この例では、 ストリーミングテーブル を作成しました。また、SQL と Python の両方で同様の構文でマテリアライズドビューを作成することもできます。詳細については、「 ストリーミングテーブル 」および 「マテリアライズドビュー」を参照してください。

この例では、ストリーミングテーブルと共にデフォルト フローを作成します。 ストリーミングテーブルのデフォルトフローは 追加 フローであり、トリガーごとに新しい行を追加します。 これは、DLT を使用する最も一般的な方法であり、フローとターゲットを 1 つのステップで作成します。このスタイルを使用して、データの取り込みやデータの変換を行うことができます。

追加フローは、1 つのターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートしています。たとえば、既存のストリーミングテーブルとフローがあり、この既存のストリーミングテーブルに書き込む新しいストリーミングソースを追加する場合は、フローの追加機能を使用できます。

複数のフローを使用して 1 つのターゲットに書き込む

前の例では、フローとストリーミングテーブルを 1 つのステップで作成しました。 以前に作成したテーブルのフローを作成することもできます。この例では、テーブルの作成とそれに関連付けられたフローを別々のステップで見ることができます。このコードは、ストリーミング テーブルとフローに同じ名前を使用するなど、デフォルト フローの作成と同じ結果になります。

Python
import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")

ターゲットから独立してフローを作成すると、同じターゲットにデータを追加する複数のフローも作成できます。

Python インターフェイスの @append_flow デコレータまたは SQL インターフェイスの CREATE FLOW...INSERT INTO 句を使用して、新しいフローを作成します (たとえば、複数のストリーミング ソースからストリーミングテーブルをターゲットにします)。追加フローは、次のような処理タスクに使用します。

  • 既存のストリーミングテーブルにデータを追加するストリーミング ソース (完全な更新を必要とせずに) を追加します。 たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。新しいリージョンがロールアウトされると、完全な更新を実行せずに新しいリージョン データをテーブルに追加できます。既存のストリーミングテーブルにストリーミングソースを追加する例については、「 例: 複数の Kafka トピックからストリーミングテーブルへの書き込み」を参照してください。
  • 不足しているヒストリカルデータ (バックフィル) を追加して、ストリーミングテーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミングテーブルがあるとします。また、ストリームテーブルに一度だけ挿入する必要があるテーブルにヒストリカルデータが格納されており、データを挿入する前に複雑な集計を実行する処理が含まれるため、データをストリームすることはできません。 バックフィルの例については、 例: 1 回限りのデータ バックフィルの実行を参照してください。
  • 複数のソースからのデータを結合し、クエリで UNION 句を使用する代わりに 1 つのストリーミングテーブルに書き込みます。 UNION の代わりに追加フロー処理を使用すると、フル・リフレッシュ更新を実行せずにターゲット・テーブルを増分的に更新できます。この方法で行われるユニオンの例については、「例: UNIONの代わりに追加フロー処理を使用する」を参照してください。

追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルです。 Python クエリの場合は、 create_streaming_table() 関数を使用してターゲット テーブルを作成します。

次の例では、同じターゲットに 2 つのフローを追加し、2 つのソース テーブルの和集合を作成します。

Python
import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
important
  • エクスペクテーションを使用してデータ品質制約を定義する必要がある場合は、create_streaming_table() 関数の一部としてターゲット表または既存の表定義でエクスペクテーションを定義します。@append_flow定義でエクスペクテーションを定義することはできません。
  • フローは フロー名 で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。
    • パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。
    • 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。

フローの種類

ストリーミングテーブルとマテリアライズドビューのデフォルトフローは追加フローです。 チェンジデータキャプチャ データソースから読み取るフローを作成することもできます。次の表では、さまざまなタイプのフローについて説明します。

フロータイプ

説明

追加

追加 フローは最も一般的なタイプのフローで、更新のたびにソース内の新しいレコードがターゲットに書き込まれます。これらは、構造化ストリーミングの追加モードに対応しています。ターゲットが完全にリフレッシュされない限り、データをターゲットに一度だけ挿入するバッチ問合せを示す ONCE フラグを追加できます。任意の数の追加フローは、特定のターゲットに私の書き込みをフローします。

デフォルトフロー (ターゲットストリーミングテーブルまたはマテリアライズドビューで作成) は、ターゲットと同じ名前になります。他のターゲットにはデフォルトのフローはありません。

Apply changes

変更の適用 フローは、チェンジデータキャプチャ (CDC) データを含むクエリを取り込みます。変更の適用フローはストリーミングテーブルのみをターゲットとすることができ、ソース はストリーミング ソース である必要があります ( ONCE フローの場合でも)。複数の変更適用フローで 1 つのストリーミングテーブルをターゲットにできます。 変更適用フローのターゲットとして機能するストリーミングテーブルは、他の変更適用フローによってのみターゲットにすることができます。

CDCデータの詳細については、「 APPLY CHANGES APIs: DLTでチェンジデータキャプチャを簡略化する」を参照してください。

追加情報

フローとその使用方法の詳細については、次のトピックを参照してください。