メインコンテンツまでスキップ

append_flow

@dp.append_flow デコレーターは、Lakeflow 宣言型パイプライン テーブルの追加フローまたはバックフィルを作成します。この関数は、 Apache Sparkストリーミング DataFrameを返す必要があります。 「宣言型パイプライン フローを使用したデータの増分読み込みと処理 」を参照してくださいLakeflow

追加フローはストリーミング テーブルまたはシンクをターゲットにすることができます。

構文

Python
from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <boolean>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #

問題

パラメーター

Type

説明

function

function

必須。ユーザー定義のクエリから Apache Sparkストリーミング データフレーム を返す関数。

target

str

必須。追加フローのターゲットとなるテーブルまたはシンクの名前。

name

str

フロー名。指定されていない場合は、デフォルトで関数名になります。

once

bool

必要に応じて、フローをバックフィルなどの 1 回限りのフローとして定義します。once=Trueを使用すると、フローは次の 2 つの方法で変化します。

  • 戻り値。streaming-query 。この場合、ストリーミング DataFrame ではなく、バッチ DataFrame である必要があります。
  • デフォルトでは、フローは 1 回実行されます。パイプラインが完全リフレッシュで更新されると、 ONCEフローが再度実行され、データが再作成されます。

comment

str

フローの説明。

spark_conf

dict

このクエリを実行するためのSpark構成のリスト

Python
from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)

# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))