メインコンテンツまでスキップ

Lakeflow Spark宣言型パイプラインのフローの例

例: 複数のKafkaトピックからストリーミング テーブルに書き込む

次の例では、kafka_targetというストリーミングテーブルを作成し、2つのKafkaトピックからそのストリーミングテーブルに書き込みます。

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

SQL クエリで使用されるread_kafka()テーブル値関数の詳細については、SQL 言語リファレンスのread_kafka を参照してください。

Python では、単一のテーブルを対象とする複数のフローをプログラムで作成できます。次の例は、Kafka トピックのリストに対するこのパターンを示しています。

注記

このパターンには、 forループを使用してテーブルを作成する場合と同じ要件があります。フローを定義する関数に Python 値を明示的に渡す必要があります。forループでテーブルを作成する」を参照してください。

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

例: 1回限りのデータバックフィルを実行する

クエリを実行して既存のストリーミング テーブルにデータを追加する場合は、 append_flow使用します。

既存のデータセットを追加した後は、複数のオプションがあります。

  • バックフィル ディレクトリに新しいデータが到着したときにクエリで新しいデータを追加する場合は、クエリをそのまま残します。
  • これを 1 回限りのバックフィルにして、再度実行しないようにする場合は、パイプラインを 1 回実行した後にクエリを削除します。
  • クエリを 1 回実行し、データが完全に更新されている場合にのみ再度実行するようにするには、追加フローでonce不安をTrueに設定します。 SQLでは、 INSERT INTO ONCEを使用します。

次の例では、ストリーミングテーブルに履歴データを追加するクエリを実行しています。

Python
from pyspark import pipelines as dp

@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

より詳細な例については、 「パイプラインを使用したヒストリカルデータのバックフィル」を参照してください。

例: フロー処理の追加の代わりに UNION

UNION句を含むクエリを使用する代わりに、追加フロー クエリを使用して複数のソースを結合し、単一のストリーミング テーブルに書き込むことができます。 UNIONの代わりに追加フロー クエリを使用すると、完全な更新を実行せずに、複数のソースからストリーミング テーブルに追加できます。

次のPythonの例には、UNION句で複数のデータソースを組み合わせたクエリが含まれています。

Python
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)

raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)

return raw_orders_us.union(raw_orders_eu)

以下の例では、UNIONクエリを フローの追加クエリに置き換えています:

Python
dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")

例: transformWithStateを使用してセンサーのハートビートを監視する

次の例は、Kafka から読み取り、センサーが定期的にハートビートを送信していることを確認するステートフル プロセッサを示しています。5 分以内にハートビートを受信しない場合、プロセッサは分析のためにターゲット Delta テーブルにエントリを送信します。

カスタム ステートフル アプリケーションの構築の詳細については、 「カスタム ステートフル アプリケーションの構築」を参照してください。

注記

RocksDB は、Databricks Runtime 17.2 以降のデフォルトの状態プロバイダーです。サポートされていないプロバイダー例外が原因でクエリが失敗した場合は、次のパイプライン構成を追加し、完全更新またはチェックポイント リセットを実行してから、パイプラインを再実行します。

JSON
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
Python
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))

# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)

# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))

# No output on input processing, output only on timer expiry
return iter([])

def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output

def close(self) -> None:
pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))