メインコンテンツまでスキップ

Lakeflow宣言型パイプラインフローを使用してデータを段階的に読み込み、処理する

データは Lakeflow 宣言型パイプラインで フロー を通じて処理されます。 各フローは 、クエリ と、通常は ターゲット で構成されます。フローは、クエリをバッチとして、またはターゲットへのデータストリームとして増分的に処理します。フローは、Databricks の ETL パイプライン内に存在します。

通常、フローは、ターゲットを更新するクエリを宣言型パイプラインで Lakeflow 作成すると自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。

アップデート

フローは、それを定義するパイプラインが更新されるたびに実行されます。フローは、利用可能な最新のデータを使用してテーブルを作成または更新します。フローのタイプとデータの変更の状態に応じて、更新では、新しいレコードのみを処理する増分更新が実行されるか、データソースからのすべてのレコードが再処理される完全更新が実行されます。

デフォルトフローを作成する

パイプラインで Lakeflow 宣言型パイプライン オブジェクトを作成するときは、通常、テーブルまたはビューとそれをサポートするクエリを定義します。 たとえば、このSQLクエリでは、customers_bronzeというテーブルから読み取って、customers_silver というストリーミングテーブルを作成します。

SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Pythonでも同じストリーミングテーブルを作成できます。Pythonでは、通常、Lakeflow 宣言型パイプラインを使用して、データフレームを返すクエリ関数を作成し、デコレータを使用して 宣言型パイプライン機能にアクセスしますLakeflow。

Python
from pyspark import pipelines as dp

@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")

この例では、 ストリーミング テーブル を作成しました。 SQLとPythonの両方で同様の構文を使用してマテリアライズドビューを作成することもできます。 詳細については、ストリーミングテーブルマテリアライズドビューを参照してください。

この例では、ストリーミングテーブルと共にデフォルト フローを作成します。 ストリーミングテーブルのデフォルトフローは 追加 フローであり、トリガーごとに新しい行を追加します。 これは、 Lakeflow 宣言型パイプラインを使用して、フローとターゲットを 1 つのステップで作成する最も一般的な方法です。 このスタイルを使用して、データの取り込みやデータの変換を行うことができます。

追加フローは、単一のターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートします。たとえば、既存のストリーミング テーブルとフローがあり、この既存のストリーミング テーブルに書き込む新しいストリーミング ソースを追加する場合、追加フロー機能を使用できます。

複数のフローを使用して単一のターゲットに書き込む

前の例では、単一のステップでフローとストリーミング テーブルを作成しました。 以前に作成したテーブルに対してもフローを作成することができます。この例では、テーブルの作成とそれに関連するフローを別々のステップで確認できます。 このコードは、ストリーミング テーブルとフローに同じ名前を使用することを含め、当然のフローを作成する場合と同じ結果になります。

Python
from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")

ターゲットから独立してフローを作成すると、同じターゲットにデータを追加する複数のフローを作成することもできます。

Pythonインターフェースの@dp.append_flowデコレータまたはSQLインターフェースのCREATE FLOW...INSERT INTO句を使用して、新しいフローを作成します。たとえば、複数のストリーミング ソースからストリーミング テーブルをターゲットにします。 次のようなタスクを処理する場合は、追加フローを使用します。

  • 完全な更新を必要とせずに既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加します。 たとえば、事業を展開しているすべての地域の地域データを結合したテーブルがあるとします。新しいリージョンが展開されると、完全な更新を実行せずに新しいリージョンのデータをテーブルに追加できます。既存のストリーミング テーブルにストリーミング ソースを追加する例については、 「例: 複数のKafkaトピックからストリーミング テーブルに書き込む」を参照してください。
  • 欠落しているヒストリカルデータを追加 (バックフィル) して、ストリーミング テーブルを更新します。 INSERT INTO ONCE構文を使用して、1 回実行される履歴バックフィル追加を作成できます。たとえば、 Apache Kafkaトピックによって書き込まれる既存のストリーミング テーブルがあるとします。 また、ヒストリカルデータはテーブルに保存されており、ストリーミング テーブルに一度だけ挿入する必要がありますが、データを挿入する前に複雑な集計の実行が処理に含まれるため、データをストリーミングすることはできません。 バックフィルの例については、 LakeFlow宣言型パイプラインを使用したヒストリカルデータのバックフィル」を参照してください。
  • クエリでUNION句を使用する代わりに、複数のソースからのデータを結合し、単一のストリーミング テーブルに書き込みます。 UNIONの代わりに追加フロー処理を使用すると、完全なリフレッシュ更新を実行せずにターゲット テーブルを増分的に更新できます。この方法で実行されるユニオンの例については、 「例: UNIONの代わりに追加フロー処理を使用する」を参照してください。

追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにすることができます。Python クエリの場合は、 create_streaming_table()関数を使用してターゲット テーブルを作成します。

次の例では、同じターゲットに 2 つのフローを追加し、2 つのソース テーブルの結合を作成します。

Python
from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
important
  • エクスペクテーションを含むデータ品質制約を定義する必要がある場合はcreate_streaming_table() 関数の一部としてターゲットテーブルまたは既存のテーブル定義にエクスペクテーションを定義してください。 @append_flow の定義で期待を定義することはできません。
  • フローは フロー名 で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。
    • パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。
    • 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。

フローの種類

ストリーミングテーブルとマテリアライズドビューの当然のフローは追加フローです。 チェンジデータキャプチャデータ ソースから読み取るフローを作成することもできます。 次の表では、さまざまな種類のフローについて説明します。

フロータイプ

説明

追加

追加 フローは最も一般的なタイプのフローで、更新ごとにソース内の新しいレコードがターゲットに書き込まれます。これらは、構造化ストリーミングの追加モードに対応します。ONCEフラグを追加して、ターゲットが完全に更新されない限り、データがターゲットに 1 回だけ挿入されるバッチ クエリを示すことができます。任意の数の追加フローが特定のターゲットに書き込むことができます。

もちろんフロー (ターゲット ストリーミングテーブルまたはマテリアライズドビューで作成されたもの) はターゲットと同じ名前になります。 その他のターゲットにはデフォルトのフローはありません。

自動 CDC (以前の 変更を適用 )

Auto CDC フローは、チェンジデータキャプチャ ( CDC ) データを含むクエリを取り込みます。 自動CDCフローはストリーミング テーブルのみをターゲットにすることができ、ソースはストリーミング ソースである必要があります ( ONCEフローの場合でも)。 複数の自動CDCフローは、単一のストリーミング テーブルをターゲットにすることができます。 自動CDCフローのターゲットとして機能するストリーミング テーブルは、他の自動CDCフローによってのみターゲットにできます。

CDCデータの詳細については、「The AUTO CDC APIs: Simplify チェンジデータキャプチャ with Lakeflow 宣言型パイプライン」を参照してください。

追加情報

フローとその使用法の詳細については、次のトピックを参照してください。