Pular para o conteúdo principal

fluxo_de_acréscimo

O decorador @dp.append_flow cria fluxos de acréscimo ou preenchimentos para suas tabelas de pipeline declarativas LakeFlow . A função deve retornar um DataFrame de streaming Apache Spark streaming . Consulte Carregar e processar dados incrementalmente com fluxos de pipeline declarativos LakeFlow.

Os fluxos de acréscimo podem ter como alvo tabelas ou coletores de transmissão.

Sintaxe

Python
from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <boolean>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #

Parâmetros

Parâmetro

Tipo

Descrição

função

function

Obrigatório. Uma função que retorna um DataFrame de streaming Apache Spark streaming de uma consulta definida pelo usuário.

target

str

Obrigatório. O nome da tabela ou do coletor que é o destino do fluxo de acréscimo.

name

str

O nome do fluxo. Se não for fornecido, o padrão será o nome da função.

once

bool

Opcionalmente, defina o fluxo como um fluxo único, como um aterro. Usar once=True altera o fluxo de duas maneiras:

  • O valor de retorno. streaming-query. deve ser um lotes DataFrame neste caso, não um transmissão DataFrame.
  • O fluxo é executado uma vez por default. Se o pipeline for atualizado com uma refresh completa, o fluxo ONCE será executado novamente para recriar os dados.

comment

str

Uma descrição para o fluxo.

spark_conf

dict

Uma lista de configurações do Spark para a execução desta consulta

Exemplos

Python
from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)

# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))