Pular para o conteúdo principal

anexar fluxo

O decorador @dlt.append_flow cria fluxos de acréscimo ou backfills para suas tabelas de pipeline declarativo LakeFlow. A função deve retornar um Apache Spark streaming DataFrame. Consulte Carregar e processar dados de forma incremental com LakeFlow Declarative pipeline flows.

Os fluxos de acréscimo podem ter como alvo tabelas de transmissão ou sinks.

Sintaxe

Python
import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <boolean>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #

Parâmetros

Parâmetro

Tipo

Descrição

função

function

Obrigatório. Uma função que retorna um Apache Spark streaming DataFrame de uma consulta definida pelo usuário.

target

str

Obrigatório. O nome da tabela ou coletor que é o destino do fluxo de acréscimo.

name

str

O nome do fluxo. Se não for fornecido, o padrão será o nome da função.

once

bool

Opcionalmente, defina o fluxo como um fluxo único, como um preenchimento. Usar once=True altera o fluxo de duas maneiras:

  • O valor de retorno. streaming-query. Nesse caso, deve ser um lote DataFrame e não uma transmissão DataFrame.
  • O fluxo é executado uma vez pelo site default. Se o pipeline for atualizado com um refresh completo, o fluxo ONCE será executado novamente para recriar os dados.

comment

str

Uma descrição do fluxo.

spark_conf

dict

Uma lista de configurações do Spark para a execução dessa consulta

Exemplos

Python
import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

# Add a backfill
@dlt.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)

# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))