Pular para o conteúdo principal

Aplicar marcas d'água para controlar o limite de processamento de dados

Este artigo apresenta os conceitos básicos de marca d'água e fornece recomendações para o uso de marcas d'água em operações comuns de transmissão com estado. O senhor deve aplicar marcas d'água às operações de transmissão com estado para evitar a expansão infinita da quantidade de dados mantidos no estado, o que poderia introduzir problemas de memória e aumentar as latências de processamento durante operações de transmissão de longa duração.

O que é uma marca d'água?

A transmissão estruturada usa marcas d'água para controlar o limite de tempo para continuar processando atualizações para uma determinada entidade estatal. Exemplos comuns de entidades estaduais incluem:

  • Agregações em uma janela de tempo.
  • Chave única em um join entre duas transmissões.

Ao declarar uma marca d'água, o senhor especifica um campo de registro de data e hora e um limite de marca d'água em uma transmissão DataFrame. À medida que novos dados chegam, o gerenciador de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registros dentro do limite de atraso.

O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janelas:

Python
from pyspark.sql.functions import window

(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)

Neste exemplo:

  • A coluna event_time é usada para definir uma marca d'água de 10 minutos e uma janela de queda de 5 minutos.
  • Uma contagem é coletada para cada id observado para cada janela não sobreposta de 5 minutos.
  • As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado event_time.
important

O limite da marca d'água garante que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da consulta definida. Os registros que chegam tardiamente, fora do limite especificado, ainda podem ser processados usando métricas de consulta, mas isso não é garantido.

Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?

As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz de marcas d'água é essencial para a transmissão eficiente de taxas de transferência com estado.

nota

Nem todos os modos de saída são compatíveis com todas as operações com estado.

Marcas d'água e modo de saída para agregações em janelas

A tabela a seguir detalha o processamento de consultas com agregação em um carimbo de data/hora com uma marca d'água definida:

Modo de saída

Comportamento

Acrescentar

As linhas são gravadas na tabela de destino quando o limite da marca d'água é ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado quando o limite é ultrapassado.

Atualizar

As linhas são gravadas na tabela de destino à medida que os resultados são calculados e podem ser atualizadas e substituídas à medida que novos dados chegam. O estado de agregação antigo é descartado quando o limite é ultrapassado.

Concluído

O estado de agregação não é descartado. A tabela de destino é reescrita com cada gatilho.

Marcas d'água e saída para junção de transmissão-transmissão

A união entre várias transmissões suporta apenas o modo append, e os registros correspondentes são gravados em cada lote descoberto. Para a união interna, o site Databricks recomenda definir um limite de marca d'água em cada fonte de transmissão de dados. Isso permite que as informações do estado sejam descartadas para registros antigos. Sem marcas d'água, a transmissão estruturada tenta join cada key de ambos os lados do join com cada acionador.

A transmissão estruturada tem uma semântica especial para dar suporte à união externa. A marca d'água é obrigatória para a união externa, pois indica quando um key deve ser gravado com um valor nulo depois de não ser correspondido. Observe que, embora a união externa possa ser útil para registrar os registros que nunca são correspondidos durante o processamento de dados, como a união só grava nas tabelas como operações de acréscimo, esses dados ausentes não são registrados até que o limite de atraso seja ultrapassado.

Controle do limite de dados tardios com política de várias marcas d'água na transmissão estruturada

Ao trabalhar com várias entradas de transmissão estruturada, o senhor pode definir várias marcas d'água para controlar o limite de tolerância dos dados que chegam com atraso. A configuração de marcas d'água permite que o senhor controle as informações de estado e impacta a latência.

Uma consulta de transmissão pode ter várias transmissões de entrada que são unidas ou juntadas. Cada uma das transmissões de entrada pode ter um limite diferente de dados atrasados que precisam ser tolerados para operações com estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada uma das transmissões de entrada. A seguir, um exemplo de consulta com união transmissão-transmissão.

Scala
val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)

Durante a execução da consulta, a transmissão estruturada rastreia individualmente o tempo máximo de evento observado em cada transmissão de entrada, calcula as marcas d'água com base no atraso correspondente e escolhe uma única marca d'água global com elas para ser usada em operações com estado. Em default, o mínimo é escolhido como marca d'água global porque garante que nenhum dado seja acidentalmente descartado como muito atrasado se uma das transmissões ficar atrás das outras (por exemplo, uma das transmissões deixa de receber dados devido a falhas no upstream). Em outras palavras, a marca d'água global se move com segurança no ritmo da transmissão mais lenta e a saída da consulta é atrasada de acordo.

Se quiser obter resultados mais rápidos, o senhor pode definir a política de marca d'água múltipla para escolher o valor máximo como marca d'água global, definindo a configuração SQL spark.sql.streaming.multipleWatermarkPolicy para max (default é min). Isso permite que a marca d'água global se mova no ritmo da transmissão mais rápida. No entanto, essa configuração descarta os dados da transmissão mais lenta. Por esse motivo, a Databricks recomenda que o senhor use essa configuração de forma criteriosa.

Coloque duplicatas dentro da marca d'água

Em Databricks Runtime 13.3 LTS e acima, é possível deduplicar registros dentro de um limite de marca d'água usando um identificador exclusivo.

A transmissão estruturada oferece garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros da fonte de dados. É possível usar o site dropDuplicatesWithinWatermark para desduplicar registros em qualquer campo especificado, o que permite remover duplicatas de uma transmissão mesmo que alguns campos sejam diferentes (como hora do evento ou hora de chegada).

É garantido que os registros duplicados que chegam dentro da marca d'água especificada serão eliminados. Essa garantia é rigorosa em apenas uma direção, e os registros duplicados que chegam fora do limite especificado também podem ser descartados. O senhor deve definir o limite de atraso da marca d'água maior do que as diferenças máximas de registro de data e hora entre os eventos duplicados para remover todas as duplicatas.

Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark, como no exemplo a seguir:

Python
streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)