fluxo_de_acréscimo
O decorador @dp.append_flow cria fluxos de acréscimo ou preenchimentos retroativos para suas tabelas de pipeline. A função deve retornar um DataFrame de streaming Apache Spark streaming . Consulte Carregar e processar dados incrementalmente com fluxos de pipeline declarativos LakeFlow Spark.
Os fluxos de acréscimo podem ter como alvo tabelas ou coletores de transmissão.
Sintaxe
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parâmetros
Parâmetro | Tipo | Descrição |
|---|---|---|
função |
| Obrigatório. Uma função que retorna um DataFrame de streaming Apache Spark streaming de uma consulta definida pelo usuário. |
|
| Obrigatório. O nome da tabela ou do coletor que é o destino do fluxo de acréscimo. |
|
| O nome do fluxo. Se não for fornecido, o padrão será o nome da função. |
|
| Opcionalmente, defina o fluxo como um fluxo único, como um aterro. Usar
|
|
| Uma descrição para o fluxo. |
|
| Uma lista de configurações do Spark para a execução desta consulta |
Exemplos
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))