Pular para o conteúdo principal

fluxo_de_atualização

info

Pré-lançamento público

A API update_flow está em prévia pública.

Utilize o decorador @dp.update_flow para criar um fluxo de atualização. Fluxos de atualização escrevem em sumidouros usando o modo de saída de atualização, emitindo apenas as linhas que foram alteradas em cada lote. Ao contrário dos fluxos de acréscimo, eles suportam agregações com estado sem exigir uma marca d'água.

Fluxos de atualização podem ter como alvo apenas destinos. Tabelas Delta não são suportadas.

Sintaxe

Python
from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)

Parâmetros

Parâmetro

Tipo

Descrição

função

function

Obrigatório. Uma função que retorna um Apache Spark streaming DataFrame a partir de uma consulta definida pelo usuário.

target

str

Obrigatório. O nome do destino no qual este fluxo grava.

name

str

Nome do fluxo. Se não for informado, o default será o nome da função.

comment

str

Uma descrição para o fluxo.

spark_conf

dict

Um dicionário de configurações do Spark para a execução desta consulta. Essas configurações substituem as definidas para o destino, pipeline ou cluster.

import_checkpoint

str

Um caminho de ponto de verificação externo para importar antes de iniciar o fluxo. Importado apenas uma vez, quando o diretório de ponto de verificação do fluxo ainda não existe.

Exemplos

Agregação para um sink do Kafka

Gravar resultados de agregação com estado em um sink do Kafka:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)

Modo em tempo real

Use spark_conf para configurar um fluxo de atualização para modo em tempo real:

Python
from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
&quot;pipelines.trigger&quot;: &quot;RealTime&quot;,
&quot;pipelines.trigger.interval&quot;: &quot;5 minutes&quot;,
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)

Limitações

  • Destinos de tabelas Delta não são suportados como alvos para fluxos de atualização.