Otimizar o processamento com estado no pipeline declarativo LakeFlow com marcas d'água
Para gerenciar efetivamente os dados mantidos no estado, use marcas d'água ao executar o processamento de transmissão com estado no pipeline declarativo LakeFlow , incluindo agregações, junções e desduplicações. Este artigo descreve como usar marcas d'água em suas consultas de pipeline declarativas LakeFlow e inclui exemplos de operações recomendadas.
Para garantir que as consultas que realizam agregações sejam processadas incrementalmente e não totalmente recalculadas a cada atualização, você deve usar marcas d'água.
O que é uma marca d'água?
No processamento de transmissão, uma marca d'água é um recurso Apache Spark que pode definir um limite baseado em tempo para processamento de dados ao executar operações com estado, como agregações. Os dados que chegam são processados até que o limite seja atingido, momento em que a janela de tempo definida pelo limite é fechada. Marcas d'água podem ser usadas para evitar problemas durante o processamento de consultas, principalmente ao processar conjuntos de dados maiores ou processamentos de longa duração. Esses problemas podem incluir alta latência na produção de resultados e até mesmo erros de falta de memória (OOM) devido à quantidade de dados mantidos no estado durante o processamento. Como os dados de transmissão são inerentemente desordenados, as marcas d'água também permitem o cálculo correto de operações como agregações de janelas de tempo.
Para saber mais sobre o uso de marcas d'água no processamento de transmissão, consulte Marca d'água no Apache Spark transmissão estruturada e Aplicar marcas d'água para controlar o processamento de dados.
Como você define uma marca d'água?
Você define uma marca d'água especificando um campo de registro de data e hora e um valor que representa o limite de tempo para chegada de dados atrasados . Os dados são considerados atrasados se chegarem após o limite de tempo definido. Por exemplo, se o limite for definido como 10 minutos, os registros que chegarem após o limite de 10 minutos poderão ser descartados.
Como os registros que chegam após o limite definido podem ser descartados, é importante selecionar um limite que atenda aos seus requisitos de latência versus correção. Escolher um limite menor resulta na emissão mais rápida de registros, mas também significa que registros atrasados têm mais probabilidade de serem descartados. Um limite maior significa uma espera maior, mas possivelmente uma maior integridade dos dados. Devido ao tamanho maior do estado, um limite maior também pode exigir recursos computacionais adicionais. Como o valor limite depende dos seus dados e requisitos de processamento, testar e monitorar seu processamento é importante para determinar um limite ideal.
Use a função withWatermark()
em Python para definir uma marca d'água. Em SQL, use a cláusula WATERMARK
para definir uma marca d'água:
- Python
- SQL
withWatermark("timestamp", "3 minutes")
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Use marcas d'água com junção transmissão-transmissão
Para a junção transmissão-transmissão, você deve definir uma marca d'água em ambos os lados da join e uma cláusula de intervalo de tempo. Como cada fonte join tem uma view incompleta dos dados, a cláusula de intervalo de tempo é necessária para informar ao mecanismo de transmissão quando nenhuma outra correspondência pode ser feita. A cláusula de intervalo de tempo deve usar os mesmos campos usados para definir as marcas d'água.
Como pode haver momentos em que cada transmissão requer um limite diferente para marcas d'água, a transmissão não precisa ter o mesmo limite. Para evitar perda de dados, o mecanismo de transmissão mantém uma marca d'água global com base na transmissão mais lenta.
O exemplo a seguir une uma transmissão de impressões de anúncios e uma transmissão de cliques de usuários em anúncios. Neste exemplo, um clique deve ocorrer dentro de 3 minutos da impressão. Após o intervalo de 3 minutos, as linhas do estado que não podem mais ser correspondidas são descartadas.
- Python
- SQL
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Executar agregações em janela com marcas d'água
Uma operação stateful comum em dados de transmissão é uma agregação em janela. Agregações em janela são semelhantes às agregações agrupadas, exceto que os valores agregados são retornados para o conjunto de linhas que fazem parte da janela definida.
Uma janela pode ser definida como um determinado comprimento, e operações de agregação podem ser executadas em todas as linhas que fazem parte dessa janela. Spark Streaming suporta três tipos de janelas:
- Janelas em cascata (fixas) : Uma série de intervalos de tempo de tamanho fixo, não sobrepostos e contíguos. Um registro de entrada pertence a apenas uma janela.
- Janelas deslizantes : semelhantes às janelas giratórias, as janelas deslizantes têm tamanho fixo, mas podem se sobrepor e um registro pode se dividir em várias janelas.
Quando os dados chegam além do fim da janela mais o comprimento da marca d'água, nenhum dado novo é aceito para a janela, o resultado da agregação é emitido e o estado da janela é descartado.
O exemplo a seguir calcula uma soma de impressões a cada 5 minutos usando uma janela fixa. Neste exemplo, a cláusula select usa o alias impressions_window
e, em seguida, a própria janela é definida como parte da cláusula GROUP BY
. A janela deve ser baseada na mesma coluna de registro de data e hora da marca d'água, a coluna clickTimestamp
neste exemplo.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Um exemplo semelhante em Python para calcular o lucro em janelas fixas de horas:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Desduplicar registros de transmissão
A transmissão estruturada tem garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros da fonte de dados. Por exemplo, como muitas filas de mensagens têm garantias de pelo menos uma vez, registros duplicados devem ser esperados ao ler de uma dessas filas de mensagens. Você pode usar a função dropDuplicatesWithinWatermark()
para desduplicar registros em qualquer campo especificado, removendo duplicatas de uma transmissão mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada). Você deve especificar uma marca d'água para usar a função dropDuplicatesWithinWatermark()
. Todos os dados duplicados que chegam dentro do intervalo de tempo especificado pela marca d'água são descartados.
Dados ordenados são importantes porque dados fora de ordem fazem com que o valor da marca d'água avance incorretamente. Então, quando dados mais antigos chegam, eles são considerados atrasados e descartados. Use a opção withEventTimeOrder
para processar o Snapshot inicial em ordem com base no registro de data e hora especificado na marca d'água. A opção withEventTimeOrder
pode ser declarada no código que define o dataset ou nas configuraçõespipeline usando spark.databricks.delta.withEventTimeOrder.enabled
. Por exemplo:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
A opção withEventTimeOrder
é suportada somente com Python.
No exemplo a seguir, os dados são processados ordenados por clickTimestamp
e os registros que chegam com 5 segundos de diferença e que contêm colunas userId
e clickAdId
duplicadas são descartados.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Otimizar a configuração do pipeline para processamento com estado
Para ajudar a evitar problemas de produção e latência excessiva, Databricks recomenda habilitar o gerenciamento de estado baseado no RocksDBpara seu processamento de transmissão com estado, principalmente se seu processamento exigir o salvamento de uma grande quantidade de estado intermediário.
Pipeline Severless gerencia automaticamente o armazenamento das configurações de estado.
Você pode habilitar o gerenciamento de estado baseado no RocksDBdefinindo a seguinte configuração antes de implantar um pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Para saber mais sobre o armazenamento do estado RocksDB , incluindo recomendações de configuração para RocksDB, consulte Configurar o armazenamento do estado RocksDB no Databricks.