fluxo_de_acréscimo
O decorador @dp.append_flow cria fluxos de acréscimo ou preenchimentos para suas tabelas de pipeline declarativas LakeFlow . A função deve retornar um DataFrame de streaming Apache Spark streaming . Consulte Carregar e processar dados incrementalmente com fluxos de pipeline declarativos LakeFlow.
Os fluxos de acréscimo podem ter como alvo tabelas ou coletores de transmissão.
Sintaxe
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <boolean>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #
Parâmetros
| Parâmetro | Tipo | Descrição | 
|---|---|---|
| função | 
 | Obrigatório. Uma função que retorna um DataFrame de streaming Apache Spark streaming de uma consulta definida pelo usuário. | 
| 
 | 
 | Obrigatório. O nome da tabela ou do coletor que é o destino do fluxo de acréscimo. | 
| 
 | 
 | O nome do fluxo. Se não for fornecido, o padrão será o nome da função. | 
| 
 | 
 | Opcionalmente, defina o fluxo como um fluxo único, como um aterro. Usar  
 | 
| 
 | 
 | Uma descrição para o fluxo. | 
| 
 | 
 | Uma lista de configurações do Spark para a execução desta consulta | 
Exemplos
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )
# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))