Pular para o conteúdo principal

foreach_batch_sink

O decorador @dp.foreach_batch_sink() define um coletor ForEachBatch, que processa uma transmissão como uma série de microlotes que são manipulados em Python com lógica personalizada. Faz-se referência ao coletor como um target em um fluxo de acréscimo para gravar os dados transformados. Para orientação conceitual, considerações e exemplos, consulte Usar ForEachBatch para gravar em coletores de dados arbitrários em pipelines.

Sintaxe

Python
from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.

Parâmetros

Parâmetro

Descrição

name

Opcional. Um nome único para identificar o sink dentro do pipeline. O padrão é o nome da UDF, quando não incluído.

manipulador de lote

Esta é a função definida pelo usuário (UDF) que será chamada para cada micro-lote.

df

Spark DataFrame contendo dados para o micro-lote atual.

ID do lote

O ID inteiro do microlote. Spark incrementa este ID para cada intervalo de trigger.

A batch_id de 0 representa o início de uma transmissão ou o início de uma refresh completa. O código foreach_batch_sink deve lidar adequadamente com um refresh completo para fontes de dados downstream. Para obter mais informações, consulte refresh completo.

Nesta página