fluxo_de_atualização
Pré-lançamento público
A API update_flow está em prévia pública.
Utilize o decorador @dp.update_flow para criar um fluxo de atualização. Fluxos de atualização escrevem em sumidouros usando o modo de saída de atualização, emitindo apenas as linhas que foram alteradas em cada lote. Ao contrário dos fluxos de acréscimo, eles suportam agregações com estado sem exigir uma marca d'água.
Fluxos de atualização podem ter como alvo apenas destinos. Tabelas Delta não são suportadas.
Sintaxe
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parâmetros
Parâmetro | Tipo | Descrição |
|---|---|---|
função |
| Obrigatório. Uma função que retorna um Apache Spark streaming DataFrame a partir de uma consulta definida pelo usuário. |
|
| Obrigatório. O nome do destino no qual este fluxo grava. |
|
| Nome do fluxo. Se não for informado, o default será o nome da função. |
|
| Uma descrição para o fluxo. |
|
| Um dicionário de configurações do Spark para a execução desta consulta. Essas configurações substituem as definidas para o destino, pipeline ou cluster. |
|
| Um caminho de ponto de verificação externo para importar antes de iniciar o fluxo. Importado apenas uma vez, quando o diretório de ponto de verificação do fluxo ainda não existe. |
Exemplos
Agregação para um sink do Kafka
Gravar resultados de agregação com estado em um sink do Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Modo em tempo real
Use spark_conf para configurar um fluxo de atualização para modo em tempo real:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitações
- Destinos de tabelas Delta não são suportados como alvos para fluxos de atualização.