メインコンテンツまでスキップ

更新フロー

備考

パブリックプレビュー

update_flow API はパブリック プレビュー段階です。

@dp.update_flow デコレーターを使用して、更新フローを作成してください。アップデートフローは、アップデート出力モードを使用してシンクに書き込み、各バッチで変更された行のみを出力します。追加フローとは異なり、これらはウォーターマークを必要とせずにステートフル集計をサポートしています。

更新フローはシンクのみをターゲットにできます。Delta テーブルはサポートされていません。

構文

Python
from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)

パラメーター

パラメーター

Type

説明

function

function

必須。ユーザー定義のクエリから Apache Sparkストリーミング データフレーム を返す関数。

target

str

必須。このフローが書き込むシンクの名前です。

name

str

フロー名です。指定されていない場合、デフォルトで関数名が使用されます。

comment

str

フローの説明です。

spark_conf

dict

このクエリーを実行するためのSpark構成の辞書です。これらの構成は、宛先、パイプライン、またはクラスターに設定されている設定を上書きします。

import_checkpoint

str

フローを開始する前にインポートする外部チェックポイントパス。フローのチェックポイントディレクトリがまだ存在しない場合、一度だけインポートされます。

Kafka シンクへの集計

Kafka シンクにステートフル集計結果を書き込む

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)

リアルタイムモード

spark_confを使用して、リアルタイムモードの更新フローを構成します。

Python
from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)

制限事項:

  • Delta テーブル シンクは、更新フローのターゲットとしてはサポートされていません。