メインコンテンツまでスキップ

DLT のフローの例

例: 複数の Kafka トピックからストリーミングテーブルへの書き込み

次の例では、kafka_targetというストリーミングテーブルを作成し、2つのKafkaトピックからそのストリーミングテーブルに書き込みます。

Python
import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

SQL クエリで使用される read_kafka() テーブル値関数の詳細については、SQL 言語リファレンスの read_kafka を参照してください。

Python では、1 つのテーブルを対象とする複数のフローをプログラムで作成できます。次の例は、Kafka トピックのリストに対するこのパターンを示しています。

注記

このパターンには、 for ループを使用してテーブルを作成するのと同じ要件があります。Python の値をフローを定義する関数に明示的に渡す必要があります。forループでのテーブルの作成を参照してください。

Python
import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

例: 1 回限りのデータバックフィルの実行

次の例では、ストリーミングテーブルに履歴データを追加するクエリを実行しています。

注記

バックフィルクエリが定期的に、または継続的に実行されるパイプラインの一部である場合に、1回限りのバックフィルを確実に実行するには、パイプラインを1回実行した後にクエリを削除してください。バックフィルディレクトリに新しいデータを追加するには、クエリはそのままにしておきます。

Python
import dlt

@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

例: 代わりに追加フロー処理を使用する UNION

UNION句を含むクエリを使用する代わりに、追加フロークエリを使用して複数のソースを結合し、1 つのストリーミングテーブルに書き込むことができます。UNIONの代わりに追加フロークエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミングテーブルに追加できます。

次のPythonの例には、UNION句で複数のデータソースを組み合わせたクエリが含まれています。

Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

return raw_orders_us.union(raw_orders_eu)

以下の例では、UNIONクエリを フローの追加クエリに置き換えています:

Python
dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")