DLT フローによるデータの増分読み込みと処理
この記事では、フローとは何か、および DLT パイプラインのフローを使用して、ソース ストリーミングテーブルからターゲット ストリーミングテーブルにデータを増分処理する方法について説明します。 DLT では、フローは 2 つの方法で定義されます。
- フローは、ストリーミングテーブルを更新するクエリを作成すると自動的に定義されます。
- DLT には、複数のストリーミング ソースからストリーミングテーブルに追加するなど、より複雑な処理のフローを明示的に定義する機能も用意されています。
この記事では、ストリーミングテーブルを更新するクエリを定義するときに作成される暗黙的なフローについて説明し、より複雑なフローを定義するための構文の詳細を示します。
フローとは?
DLT では、 フロー は、ソース データを増分的に処理してターゲット ストリーミングテーブルを更新するストリーミング クエリです。 パイプラインで作成するほとんどの DLT データセットは、フローをクエリの一部として定義し、フローを明示的に定義する必要はありません。たとえば、DLT でストリーミングテーブルを 1 つの DDL コマンドで作成すると、別々のテーブルステートメントとフローステートメントを使用してストリーミングテーブルを作成する代わりになります。
この CREATE FLOW
例は、説明のみを目的として提供されており、有効な DLT 構文ではないキーワードが含まれています。
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
クエリによって定義されるデフォルトのフローに加えて、DLT Python および SQL インターフェースは 追加フロー 機能を提供します。追加フローは、1 つのストリーミングテーブルを更新するために複数のストリーミング ソースからデータを読み取る必要がある処理をサポートします。 たとえば、既存のストリーミングテーブルとフローがあり、この既存のストリーミングテーブルに書き込む新しいストリーミングソースを追加する場合は、フローの追加機能を使用できます。
追加フローを使用して、複数のソース ストリームからストリーミングテーブルに書き込む
Pythonインターフェースの @append_flow
デコレータ、または SQLインターフェースの CREATE FLOW
句を使用して、複数のストリーミングソースからストリーミングテーブルに書き込みます。次のような処理には、フローの追加を使用します。
- 既存のストリーミングテーブルにデータを追加するストリーミング ソース (完全な更新を必要とせずに) を追加します。 たとえば、事業を展開しているすべての地域の地域データを組み合わせたテーブルがあるとします。 新しいリージョンがロールアウトされると、完全な更新を実行せずに新しいリージョン データをテーブルに追加できます。 例: 複数の Kafka トピックからストリーミングテーブルへの書き込みを参照してください。
- 不足しているヒストリカルデータ (バックフィル) を追加して、ストリーミングテーブルを更新します。 たとえば、Apache Kafka トピックによって書き込まれる既存のストリーミングテーブルがあるとします。 また、ストリームテーブルに一度だけ挿入する必要があるテーブルにヒストリカルデータが格納されており、データを挿入する前に複雑な集計を実行する処理が含まれるため、データをストリームすることはできません。 例: 1 回限りのデータバックフィルの実行を参照してください。
- 複数のソースからのデータを結合し、クエリで
UNION
句を使用する代わりに 1 つのストリーミングテーブルに書き込みます。UNION
の代わりに追加フロー処理を使用すると、フル・リフレッシュ更新を実行せずにターゲット・テーブルを増分的に更新できます。例:UNION
の代わりに追加フロー処理を使用するを参照してください。
追加フロー処理によって出力されるレコードのターゲットは、既存のテーブルまたは新しいテーブルです。 Python クエリの場合は、 create_streaming_table() 関数を使用してターゲット テーブルを作成します。
- エクスペクテーションを使用してデータ品質制約を定義する必要がある場合は、
create_streaming_table()
関数の一部としてターゲット表または既存の表定義でエクスペクテーションを定義します。@append_flow
定義でエクスペクテーションを定義することはできません。 - フローは フロー名 で識別され、この名前はストリーミングチェックポイントを識別するために使用されます。チェックポイントを識別するためにフロー名を使用するということは、以下のことを意味します。
- パイプライン内の既存のフローの名前が変更された場合、チェックポイントは引き継がれず、名前が変更されたフローは事実上まったく新しいフローになります。
- 既存のチェックポイントが新しいフロー定義と一致しないため、パイプラインでフロー名を再利用することはできません。
@append_flow
の構文は次のとおりです。
- Python
- SQL
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
例: 複数の Kafka トピックからストリーミングテーブルへの書き込み
次の例では、kafka_target
というストリーミングテーブルを作成し、2つのKafkaトピックからそのストリーミングテーブルに書き込みます。
- Python
- SQL
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
SQL クエリで使用される read_kafka()
テーブル値関数の詳細については、SQL 言語リファレンスの read_kafka を参照してください。
Python では、1 つのテーブルを対象とする複数のフローをプログラムで作成できます。 次の例は、Kafka トピックのリストに対するこのパターンを示しています。
このパターンには、 for
ループを使用してテーブルを作成するのと同じ要件があります。 Python の値をフローを定義する関数に明示的に渡す必要があります。 for
ループでのテーブルの作成を参照してください。
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
例: 1 回限りのデータバックフィルの実行
次の例では、ストリーミングテーブルに履歴データを追加するクエリを実行しています。
バックフィルクエリが定期的に、または継続的に実行されるパイプラインの一部である場合に、1回限りのバックフィルを確実に実行するには、パイプラインを1回実行した後にクエリを削除してください。バックフィルディレクトリに新しいデータを追加するには、クエリはそのままにしておきます。
- Python
- SQL
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
例: 代わりに追加フロー処理を使用する UNION
UNION
句を含むクエリを使用する代わりに、追加フロークエリを使用して複数のソースを結合し、1 つのストリーミングテーブルに書き込むことができます。UNION
の代わりに追加フロークエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミングテーブルに追加できます。
次のPythonの例には、UNION
句で複数のデータソースを組み合わせたクエリが含まれています。
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
以下の例では、UNION
クエリを フローの追加クエリに置き換えています:
- Python
- SQL
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);