Lakeflow 宣言型パイプラインのフローの例
例: 複数の Kafka トピックからストリーミングテーブルへの書き込み
次の例では、kafka_target
というストリーミングテーブルを作成し、2つのKafkaトピックからそのストリーミングテーブルに書き込みます。
- Python
- SQL
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
SQL クエリで使用される read_kafka()
テーブル値関数の詳細については、SQL 言語リファレンスの read_kafka を参照してください。
Python では、1 つのテーブルを対象とする複数のフローをプログラムで作成できます。次の例は、Kafka トピックのリストに対するこのパターンを示しています。
注記
このパターンには、 for
ループを使用してテーブルを作成するのと同じ要件があります。Python の値をフローを定義する関数に明示的に渡す必要があります。for
ループでのテーブルの作成を参照してください。
Python
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
例: 1 回限りのデータバックフィルの実行
クエリを実行して既存のストリーミングテーブルにデータを追加する場合は、 append_flow
.
既存のデータのセットを追加した後、複数のオプションがあります。
- 新しいデータがバックフィル ディレクトリに到着した場合にクエリで新しいデータを追加する場合は、クエリをそのままにしておきます。
- これを 1 回限りのバックフィルにし、二度と実行しない場合は、パイプラインを 1 回実行した後でクエリを削除します。
- クエリを 1 回実行し、データが完全に更新される場合にのみ再度実行する場合は、追加フローで
once
パラメーターをTrue
に設定します。 SQL では、INSERT INTO ONCE
.
次の例では、ストリーミングテーブルに履歴データを追加するクエリを実行しています。
- Python
- SQL
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
詳細な例については、「LakeFlow Declarative パイプラインを使用したヒストリカルデータのバックフィル」を参照してください。
例: 代わりに追加フロー処理を使用する UNION
UNION
句を含むクエリを使用する代わりに、追加フロークエリを使用して複数のソースを結合し、1 つのストリーミングテーブルに書き込むことができます。UNION
の代わりに追加フロークエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミングテーブルに追加できます。
次のPythonの例には、UNION
句で複数のデータソースを組み合わせたクエリが含まれています。
Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
以下の例では、UNION
クエリを フローの追加クエリに置き換えています:
- Python
- SQL
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);