DLT フローによるデータの増分読み込みと処理
データは フロー を通じてDLTで処理されます。各フローは 、クエリ と、通常は ターゲット で構成されます。フローは、クエリをバッチとして、またはターゲットへのデータストリームとして増分的に処理します。フローは、Databricks の ETL パイプライン内に存在します。
通常、フローは、ターゲットを更新するクエリを DLT で作成すると自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。
最新情報
フローは、その定義パイプラインが更新されるたびに実行されます。フローは、利用可能な最新のデータを使用してテーブルを作成または更新します。フローのタイプとデータに対する変更の状態に応じて、更新では、新しいレコードのみを処理する増分更新を実行するか、データソースからすべてのレコードを再処理する完全更新を実行する場合があります。
- パイプラインの更新の詳細については、「DLT パイプラインでの更新の実行」を参照してください。
- 更新のスケジュール設定とトリガーの詳細については、「 トリガー パイプライン モードと継続的パイプライン モード」を参照してください。
デフォルト フローを作成する
パイプラインで DLT オブジェクトを作成するときは、通常、テーブルまたはビューと、それをサポートするクエリを定義します。たとえば、このSQLクエリでは、customers_bronze
というテーブルから読み取って、customers_silver
というストリーミングテーブルを作成します。
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Pythonでも同じストリーミングテーブルを作成できます。Python では、通常、DLT を使用するには、データフレームを返すクエリ関数を作成し、DLT 機能にアクセスするためのデコレータを使用します。
import dlt
@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
この例では、 ストリーミングテーブル を作成しました。また、SQL と Python の両方で同様の構文でマテリアライズドビューを作成することもできます。詳細については、「 ストリーミングテーブル 」および 「マテリアライズドビュー」を参照してください。
この例では、ストリーミングテーブルと共にデフォルト フローを作成します。 ストリーミングテーブルのデフォルトフローは 追加 フローであり、トリガーごとに新しい行を追加します。 これは、DLT を使用する最も一般的な方法であり、フローとターゲットを 1 つのステップで作成します。このスタイルを使用して、データの取り込みやデータの変換を行うことができます。
追加フローは、1 つのターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートしています。たとえば、既存のストリーミングテーブルとフローがあり、この既存のストリーミングテーブルに書き込む新しいストリーミングソースを追加する場合は、フローの追加機能を使用できます。
複数のフローを使用して 1 つのターゲットに書き込む
前の例では、フローとストリーミングテーブルを 1 つのステップで作成しました。 以前に作成したテーブルのフローを作成することもできます。この例では、テーブルの作成とそれに関連付けられたフローを別々のステップで見ることができます。このコードは、ストリーミング テーブルとフローに同じ名前を使用するなど、デフォルト フローの作成と同じ結果になります。
- Python
- SQL
import dlt
# create streaming table
dlt.create_streaming_table("customers_silver")
# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
ターゲットから独立してフローを作成すると、同じターゲットにデータを追加する複数のフローも作成できます。
Python インターフェイスの @append_flow
デコレータまたは SQL インターフェイスの CREATE FLOW...INSERT INTO
句を使用して、新しいフローを作成します (たとえば、複数のストリーミング ソースからストリーミングテーブルをターゲットにします)。追加フローは、次のような処理タスクに使用します。
- 既存のストリーミングテーブルにデータを追加するストリーミング ソース (完全な更新を必要とせずに) を追加します。 たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。新しいリージョンがロールアウトされると、完全な更新を実行せずに新しいリージョン データをテーブルに追加できます。既存のストリーミングテーブルにストリーミングソースを追加する例については、「 例: 複数の Kafka トピックからストリーミングテーブルへの書き込み」を参照してください。
- 不足しているヒストリカルデータ (バックフィル) を追加して、ストリーミングテーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミングテーブルがあるとします。また、ストリームテーブルに一度だけ挿入する必要があるテーブルにヒストリカルデータが格納されており、データを挿入する前に複雑な集計を実行する処理が含まれるため、データをストリームすることはできません。 バックフィルの例については、 例: 1 回限りのデータ バックフィルの実行を参照してください。
- 複数のソースからのデータを結合し、クエリで
UNION
句を使用する代わりに 1 つのストリーミングテーブルに書き込みます。UNION
の代わりに追加フロー処理を使用すると、フル・リフレッシュ更新を実行せずにターゲット・テーブルを増分的に更新できます。この方法で行われるユニオンの例については、「例:UNION
の代わりに追加フロー処理を使用する」を参照してください。
追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルです。 Python クエリの場合は、 create_streaming_table() 関数を使用してターゲット テーブルを作成します。
次の例では、同じターゲットに 2 つのフローを追加し、2 つのソース テーブルの和集合を作成します。
- Python
- SQL
import dlt
# create a streaming table
dlt.create_streaming_table("customers_us")
# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
- エクスペクテーションを使用してデータ品質制約を定義する必要がある場合は、
create_streaming_table()
関数の一部としてターゲット表または既存の表定義でエクスペクテーションを定義します。@append_flow
定義でエクスペクテーションを定義することはできません。 - フローは フロー名 で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。
- パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。
- 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。
フローの種類
ストリーミングテーブルとマテリアライズドビューのデフォルトフローは追加フローです。 チェンジデータキャプチャ データソースから読み取るフローを作成することもできます。次の表では、さまざまなタイプのフローについて説明します。
フロータイプ | 説明 |
---|---|
追加 | 追加 フローは最も一般的なタイプのフローで、更新のたびにソース内の新しいレコードがターゲットに書き込まれます。これらは、構造化ストリーミングの追加モードに対応しています。ターゲットが完全にリフレッシュされない限り、データをターゲットに一度だけ挿入するバッチ問合せを示す デフォルトフロー (ターゲットストリーミングテーブルまたはマテリアライズドビューで作成) は、ターゲットと同じ名前になります。他のターゲットにはデフォルトのフローはありません。 |
Apply changes | 変更の適用 フローは、チェンジデータキャプチャ (CDC) データを含むクエリを取り込みます。変更の適用フローはストリーミングテーブルのみをターゲットとすることができ、ソース はストリーミング ソース である必要があります ( CDCデータの詳細については、「 APPLY CHANGES APIs: DLTでチェンジデータキャプチャを簡略化する」を参照してください。 |
追加情報
フローとその使用方法の詳細については、次のトピックを参照してください。