Pular para o conteúdo principal

Aplicar marcas d'água para controlar o limite de processamento de dados

Esta página descreve os conceitos básicos de marca d'água e fornece recomendações para o uso de marcas d'água em operações comuns de transmissão com estado. É necessário aplicar marcas d'água às operações de transmissão com estado para evitar a expansão infinita da quantidade de dados mantidos no estado, o que pode causar problemas de memória ou aumentar a latência de processamento durante operações de transmissão de longa duração.

O que é uma marca d'água?

A transmissão estruturada usa marcas d'água para controlar o limite de tempo para continuar processando atualizações para uma determinada entidade estatal. Exemplos comuns de entidades estaduais incluem:

  • Agregações em uma janela de tempo.
  • Chave única em um join entre duas transmissões.

Ao declarar uma marca d'água, o senhor especifica um campo de registro de data e hora e um limite de marca d'água em uma transmissão DataFrame. À medida que novos dados chegam, o gerenciador de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa todos os registros dentro do limite de atraso.

O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janelas:

Python
from pyspark.sql.functions import window

(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)

Neste exemplo:

  • A coluna event_time é usada para definir uma marca d'água de 10 minutos e uma janela de queda de 5 minutos.
  • Uma contagem é coletada para cada id observado para cada janela não sobreposta de 5 minutos.
  • As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado event_time.
importante

A garantia de limite de marca d'água assegura que os registros que chegam dentro do limite especificado sejam processados de acordo com a semântica da consulta definida. Registros que chegam com atraso, fora do limite especificado, ainda podem ser processados usando métricas de consulta, mas isso não é garantido.

Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?

As marcas d'água interagem com os modos de saída para controlar quando os dados são gravados no coletor. Como as marcas d'água reduzem a quantidade total de informações de estado a serem processadas, o uso eficaz de marcas d'água é essencial para a transmissão eficiente de taxas de transferência com estado.

nota

Nem todos os modos de saída são compatíveis com todas as operações com estado.

Marcas d'água e modo de saída para agregações em janelas

A tabela a seguir detalha o processamento de consultas com agregação em um carimbo de data/hora com uma marca d'água definida:

Modo de saída

Comportamento

Acrescentar

As linhas são gravadas na tabela de destino após o limite da marca d'água ser ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado após o limite ser ultrapassado.

Atualizar

As linhas são gravadas na tabela de destino à medida que os resultados são calculados e podem ser atualizadas e sobrescritas conforme novos dados chegam. O estado de agregação antigo é descartado após o limite ser ultrapassado.

Concluído

O estado de agregação não é descartado. A tabela de destino é reescrita a cada gatilho.

Marcas d'água e saída para junção transmissão-transmissão

A união entre várias transmissões suporta apenas o modo append, e os registros correspondentes são gravados em cada lote descoberto. Para a união interna, o site Databricks recomenda definir um limite de marca d'água em cada fonte de transmissão de dados. Isso permite que as informações do estado sejam descartadas para registros antigos. Sem marcas d'água, a transmissão estruturada tenta join cada key de ambos os lados do join com cada acionador.

transmissão estruturada possui semântica especial para suportar junção externa. A marca d'água é obrigatória para junções externas, pois indica quando uma key deve ser escrita com um valor nulo após não encontrar correspondência. Embora a junção externa possa ser útil para registrar registros que nunca são correspondidos durante o processamento de dados, como a junção apenas grava em tabelas como operações de acréscimo, esses dados ausentes não são registrados até que o limite de atraso tenha passado.

Controle do limite de dados tardios com política de várias marcas d'água na transmissão estruturada

Ao trabalhar com várias entradas de transmissão estruturada, o senhor pode definir várias marcas d'água para controlar o limite de tolerância dos dados que chegam com atraso. A configuração de marcas d'água permite que o senhor controle as informações de estado e impacta a latência.

Uma consulta de transmissão pode ter várias transmissões de entrada que são unidas ou juntadas. Cada uma das transmissões de entrada pode ter um limite diferente de dados atrasados que precisam ser tolerados para operações com estado. Especifique esses limites usando withWatermarks("eventTime", delay) em cada uma das transmissões de entrada. A seguir, um exemplo de consulta com união transmissão-transmissão.

Scala
val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)

Durante a execução da consulta, a transmissão estruturada rastreia individualmente o tempo máximo de evento observado em cada transmissão de entrada, calcula marcas d'água com base no atraso correspondente e escolhe uma única marca d'água global para ser usada em operações com estado. Por default, o valor mínimo é escolhido como marca d'água global, pois impede que os dados sejam descartados acidentalmente por serem considerados tardios caso uma das transmissões fique atrasada em relação às outras (por exemplo, se uma das transmissões parar de receber dados devido a falhas na origem). Em outras palavras, a marca d'água global se move com segurança na velocidade da transmissão mais lenta e a saída da consulta é atrasada de acordo.

Se você quiser obter resultados mais rápidos, pode definir a política de marca d'água múltipla para escolher o valor máximo como marca d'água global, definindo a configuração SQL de spark.sql.streaming.multipleWatermarkPolicy para max (default é min). Isso permite que a marca d'água global se mova no ritmo da transmissão mais rápida. No entanto, essa configuração descarta dados da transmissão mais lenta. A Databricks recomenda o uso criterioso dessa configuração.

Aplicar marcas d'água a operações distintas

O operador distinct é um operador com estado que requer marcas d'água para evitar o crescimento ilimitado do estado. Sem marcas d'água, a transmissão estruturada tenta rastrear indefinidamente cada registro único, o que pode levar a problemas de memória ou aumento da latência de processamento.

Ao aplicar distinct a um DataFrame de transmissão, você deve especificar uma marca d'água em um campo de carimbo de data/hora. A marca d'água controla por quanto tempo o gerenciador de estado mantém os registros para fins de desduplicação. Após ultrapassar o limite da marca d'água, os registros antigos são removidos do estado.

O exemplo a seguir aplica uma marca d'água a uma operação distinct :

Python
streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)

Neste exemplo, os registros duplicados que chegam dentro de 1 hora do último eventTime observado são removidos da transmissão. As informações estaduais para desduplicação são eliminadas após o limite passar.

importante

Se você precisar remover duplicados em colunas específicas em vez de todas as colunas, use dropDuplicates() ou dropDuplicatesWithinWatermark() em vez de distinct. Veja a próxima seção para mais detalhes.

Coloque duplicatas dentro da marca d'água

No Databricks Runtime 13.3 LTS ou posterior, você pode remover registros duplicados dentro de um limite de marca d'água usando um identificador exclusivo.

A transmissão estruturada fornece garantias de processamento exatamente uma vez, mas não desduplica automaticamente os registros da fonte de dados. Você pode usar dropDuplicatesWithinWatermark para remover registros duplicados em qualquer campo especificado, permitindo que você remova duplicatas de uma transmissão mesmo que alguns campos sejam diferentes (como horário do evento ou horário de chegada).

É garantido que os registros duplicados que chegam dentro da marca d'água especificada serão eliminados. Essa garantia é rigorosa em apenas uma direção, e os registros duplicados que chegam fora do limite especificado também podem ser descartados. O senhor deve definir o limite de atraso da marca d'água maior do que as diferenças máximas de registro de data e hora entre os eventos duplicados para remover todas as duplicatas.

Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark, como no exemplo a seguir:

Python
streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)