Lakeflow宣言型パイプラインフローを使用してデータを段階的に読み込み、処理する
データは Lakeflow 宣言型パイプラインで フロー を通じて処理されます。 各フローは 、クエリ と、通常は ターゲット で構成されます。フローは、クエリをバッチとして、またはターゲットへのデータストリームとして増分的に処理します。フローは、Databricks の ETL パイプライン内に存在します。
通常、フローは、ターゲットを更新するクエリを宣言型パイプラインで Lakeflow 作成すると自動的に定義されますが、複数のソースから 1 つのターゲットに追加するなど、より複雑な処理のために追加のフローを明示的に定義することもできます。
アップデート
フローは、それを定義するパイプラインが更新されるたびに実行されます。フローは、利用可能な最新のデータを使用してテーブルを作成または更新します。フローのタイプとデータの変更の状態に応じて、更新では、新しいレコードのみを処理する増分更新が実行されるか、データソースからのすべてのレコードが再処理される完全更新が実行されます。
- パイプラインの更新の詳細については、「Lakeflow 宣言型パイプラインでの更新の実行」を参照してください。
- 更新のスケジュールとトリガーの詳細については、 「トリガー モードと連続パイプライン モード」を参照してください。
デフォルトフローを作成する
パイプラインで Lakeflow 宣言型パイプライン オブジェクトを作成するときは、通常、テーブルまたはビューとそれをサポートするクエリを定義します。 たとえば、このSQLクエリでは、customers_bronze
というテーブルから読み取って、customers_silver
というストリーミングテーブルを作成します。
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Pythonでも同じストリーミングテーブルを作成できます。Pythonでは、通常、Lakeflow 宣言型パイプラインを使用して、データフレームを返すクエリ関数を作成し、デコレータを使用して 宣言型パイプライン機能にアクセスしますLakeflow。
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
この例では、 ストリーミング テーブル を作成しました。 SQLとPythonの両方で同様の構文を使用してマテリアライズドビューを作成することもできます。 詳細については、ストリーミングテーブルとマテリアライズドビューを参照してください。
この例では、ストリーミングテーブルと共にデフォルト フローを作成します。 ストリーミングテーブルのデフォルトフローは 追加 フローであり、トリガーごとに新しい行を追加します。 これは、 Lakeflow 宣言型パイプラインを使用して、フローとターゲットを 1 つのステップで作成する最も一般的な方法です。 このスタイルを使用して、データの取り込みやデータの変換を行うことができます。
追加フローは、単一のターゲットを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理もサポートします。たとえば、既存のストリーミング テーブルとフローがあり、この既存のストリーミング テーブルに書き込む新しいストリーミング ソースを追加する場合、追加フロー機能を使用できます。
複数のフローを使用して単一のターゲットに書き込む
前の例では、単一のステップでフローとストリーミング テーブルを作成しました。 以前に作成したテーブルに対してもフローを作成することができます。この例では、テーブルの作成とそれに関連するフローを別々のステップで確認できます。 このコードは、ストリーミング テーブルとフローに同じ名前を使用することを含め、当然のフローを作成する場合と同じ結果になります。
- Python
- SQL
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
ターゲットから独立してフローを作成すると、同じターゲットにデータを追加する複数のフローを作成することもできます。
Pythonインターフェースの@dp.append_flow
デコレータまたはSQLインターフェースのCREATE FLOW...INSERT INTO
句を使用して、新しいフローを作成します。たとえば、複数のストリーミング ソースからストリーミング テーブルをターゲットにします。 次のようなタスクを処理する場合は、追加フローを使用します。
- 完全な更新を必要とせずに既存のストリーミング テーブルにデータを追加するストリーミング ソースを追加します。 たとえば、事業を展開しているすべての地域の地域データを結合したテーブルがあるとします。新しいリージョンが展開されると、完全な更新を実行せずに新しいリージョンのデータをテーブルに追加できます。既存のストリーミング テーブルにストリーミング ソースを追加する例については、 「例: 複数のKafkaトピックからストリーミング テーブルに書き込む」を参照してください。
- 欠落しているヒストリカルデータを追加 (バックフィル) して、ストリーミング テーブルを更新します。
INSERT INTO ONCE
構文を使用して、1 回実行される履歴バックフィル追加を作成できます。たとえば、 Apache Kafkaトピックによって書き込まれる既存のストリーミング テーブルがあるとします。 また、ヒストリカルデータはテーブルに保存されており、ストリーミング テーブルに一度だけ挿入する必要がありますが、データを挿入する前に複雑な集計の実行が処理に含まれるため、データをストリーミングすることはできません。 バックフィルの例については、 LakeFlow宣言型パイプラインを使用したヒストリカルデータのバックフィル」を参照してください。 - クエリで
UNION
句を使用する代わりに、複数のソースからのデータを結合し、単一のストリーミング テーブルに書き込みます。UNION
の代わりに追加フロー処理を使用すると、完全なリフレッシュ更新を実行せずにターゲット テーブルを増分的に更新できます。この方法で実行されるユニオンの例については、 「例:UNION
の代わりに追加フロー処理を使用する」を参照してください。
追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルにすることができます。Python クエリの場合は、 create_streaming_table()関数を使用してターゲット テーブルを作成します。
次の例では、同じターゲットに 2 つのフローを追加し、2 つのソース テーブルの結合を作成します。
- Python
- SQL
from pyspark import pipelines as dp
# create a streaming table
dp.create_streaming_table("customers_us")
# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
- エクスペクテーションを含むデータ品質制約を定義する必要がある場合は、
create_streaming_table()
関数の一部としてターゲットテーブルまたは既存のテーブル定義にエクスペクテーションを定義してください。@append_flow
の定義で期待を定義することはできません。 - フローは フロー名 で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。
- パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。
- 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。
フローの種類
ストリーミングテーブルとマテリアライズドビューの当然のフローは追加フローです。 チェンジデータキャプチャデータ ソースから読み取るフローを作成することもできます。 次の表では、さまざまな種類のフローについて説明します。
フロータイプ | 説明 |
---|---|
追加 | 追加 フローは最も一般的なタイプのフローで、更新ごとにソース内の新しいレコードがターゲットに書き込まれます。これらは、構造化ストリーミングの追加モードに対応します。 もちろんフロー (ターゲット ストリーミングテーブルまたはマテリアライズドビューで作成されたもの) はターゲットと同じ名前になります。 その他のターゲットにはデフォルトのフローはありません。 |
自動 CDC (以前の 変更を適用 ) | Auto CDC フローは、チェンジデータキャプチャ ( CDC ) データを含むクエリを取り込みます。 自動CDCフローはストリーミング テーブルのみをターゲットにすることができ、ソースはストリーミング ソースである必要があります ( CDCデータの詳細については、「The AUTO CDC APIs: Simplify チェンジデータキャプチャ with Lakeflow 宣言型パイプライン」を参照してください。 |
追加情報
フローとその使用法の詳細については、次のトピックを参照してください。