Pular para o conteúdo principal

Aplicar marcas d'água para controlar o limite de processamento de dados

Esta página descreve conceitos de marca d'água e apresenta recomendações para o uso de marcas d'água em operações comuns de transmissão com estado.

consultas de transmissão acumulam dados de estado ao longo do tempo. As marcas d'água removem automaticamente os dados de estado antigos para evitar erros de memória e aumento da latência de processamento.

O que é uma marca d'água?

Durante o processamento, a transmissão estruturada mantém o estado entre os microlotes. As consultas de transmissão usam o estado para atualizar os resultados incrementalmente, em vez de recalcular tudo após cada microlote. As marcas d'água controlam o limite a partir do qual uma consulta interrompe o processamento de uma entidade de estado.

Exemplos comuns de entidades estatais incluem:

  • Agregações em uma janela de tempo.
  • Chave única em um join entre duas transmissões.

Para adicionar uma marca d'água a um DataFrame de transmissão, especifique um campo de carimbo de data/hora e um limite de atraso. À medida que novos dados chegam, o gerenciador de estado rastreia o registro de data e hora mais recente no campo especificado e processa apenas os registros dentro do limite de atraso.

As consultas sempre processam registros que chegam dentro do limite estabelecido. As consultas ainda podem processar registros que chegam fora do limite, mas isso não é garantido.

O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janelas:

Python
from pyspark.sql.functions import window

(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)

Neste exemplo:

  • A coluna event_time é usada para definir uma marca d'água de 10 minutos e uma janela de queda de 5 minutos.
  • É feita uma contagem para cada id observado em cada janela de 5 minutos não sobreposta.
  • As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo que o último observado event_time.
importante

Em operações groupBy() e window() , referencie as colunas pelo nome, "<colName>" ou col("<colName>"), para garantir que o marcador de tempo do evento seja preservado. Em Scala, você também pode usar $colName.

Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?

Os modos de saída controlam quando uma consulta com marcas d'água grava dados no coletor. As marcas d'água são essenciais para o controle da taxa de transferência na transmissão stateful porque reduzem a quantidade total de informações de estado na memória. Nem todos os modos de saída são suportados para todas as operações com estado. Consulte Marcas d'água e modo de saída para agregações em janela.

A escolha da duração da marca d'água apresenta vantagens e desvantagens:

  • Marcas d'água mais curtas reduzem a latência das consultas porque estas armazenam menos informações de estado e gravam os resultados somente após a conclusão de cada duração da marca d'água. No entanto, marcas d'água curtas têm baixa tolerância a dados atrasados.
  • Marcas d'água mais longas têm alta tolerância a dados atrasados. No entanto, marcas d'água longas aumentam a latência das consultas, pois estas precisam armazenar mais informações de estado e aguardar para gravar os resultados após uma duração maior da marca d'água.

Marcas d'água e modo de saída para agregações em janelas

A tabela a seguir mostra o comportamento de processamento para consultas com agregação em um carimbo de data/hora e uma marca d'água:

Modo de saída

Comportamento

Acrescentar

A consulta grava linhas na tabela de destino depois que o limite da marca d'água for ultrapassado. Todas as gravações são atrasadas com base no limite de atraso. O estado de agregação antigo é descartado após o limite ser ultrapassado.

Atualizar

A consulta grava linhas na tabela de destino à medida que os resultados são calculados, e a consulta pode atualizar e sobrescrever linhas conforme novos dados chegam. O estado de agregação antigo é descartado após o limite ser ultrapassado.

Concluído

O estado de agregação não é descartado. A consulta reescreve a tabela de destino para cada gatilho.

Marcas d'água e modos de saída para junção transmissão-transmissão

A junção entre múltiplas transmissões suporta apenas o modo de anexação. As consultas registram as correspondências para cada lote.

Para junção interna, Databricks recomenda que você defina um limite de marca d’água em cada fonte de transmissão de dados para permitir que a consulta descarte informações de estado para registros antigos. Sem marcas d'água, a transmissão estruturada tenta join todas as key de ambos os lados da join em cada gatilho, o que pode afetar o desempenho.

Para junções externas, a marca d'água é obrigatória. Quando um registro não corresponde a nenhum registro, a consulta grava um valor nulo para essa key. Como o comando join suporta apenas o modo append, os registros não correspondentes não são gravados até que o limite de atraso seja atingido.

Controle o limite de dados atrasados com uma política de múltiplas marcas d'água.

Para múltiplas entradas de transmissão estruturada, você pode definir várias marcas d'água para controlar o limite de tolerância para dados atrasados. As marcas d'água permitem controlar informações de estado e latência.

Uma consulta de transmissão pode ter várias transmissões de entrada que são unidas ou combinadas. Para operações stateful, cada uma das transmissões de entrada pode exigir um limite diferente para tolerância tardia de dados. Especifique esses limites usando withWatermark("eventTime", delay) em cada transmissão de entrada. A seguir está um exemplo de consulta com transmissão-transmissão join.

Python
input_stream1 = ...      # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)

Ao executar a consulta com operações com estado, a transmissão estruturada rastreia individualmente o tempo máximo do evento para cada transmissão de entrada, calcula marcas d'água com base no atraso correspondente e determina uma única marca d'água global. Por default, a transmissão estruturada utiliza o mínimo como marca d'água global. Se uma transmissão ficar atrasada em relação às outras, uma marca d'água global mínima impede que a consulta marque acidentalmente os dados como atrasados. Por exemplo, isso pode ocorrer quando uma das transmissões para de receber dados devido a falhas a montante. A marca d'água global se move com segurança na velocidade da transmissão mais lenta e atrasa a saída da consulta quando necessário.

Para reduzir a latência, defina spark.sql.streaming.multipleWatermarkPolicy para max (default é min) para usar a marca d'água da transmissão mais rápida como marca d'água global. No entanto, essa configuração descarta dados da transmissão mais lenta. A Databricks recomenda que você aplique essa configuração com cautela.

Aplicar marcas d'água a operações distintas

O distinct operações rastreia cada registro único no estado. Sem uma marca d'água, o estado cresce indefinidamente e pode causar problemas de memória. Especifique uma marca d'água em um campo de registro de data e hora para definir um estado específico e remover registros antigos após o limite ser atingido.

O exemplo a seguir aplica uma marca d'água a uma operação distinct :

Python
streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)

Neste exemplo, a consulta de transmissão remove registros duplicados que chegam dentro de 1 hora do último observado eventTime. A consulta descarta informações de estado para desduplicação depois que o limite passa.

importante

Para remover colunas específicas em vez de todas as colunas, use dropDuplicates() ou dropDuplicatesWithinWatermark() em vez de distinct. Consulte Remover duplicados dentro da marca d'água.

Coloque duplicatas dentro da marca d'água

No Databricks Runtime 13.3 LTS ou superior, você pode usar um identificador exclusivo para remover registros duplicados dentro de um limite de marca d'água.

A transmissão estruturada garante o processamento exatamente uma vez, mas não desduplica registros da fonte de dados. Use dropDuplicatesWithinWatermark para remover duplicados em qualquer campo, mesmo quando os campos diferem entre registros duplicados, como horário do evento ou horário de chegada.

Com dropDuplicatesWithinWatermark, as consultas sempre removem registros duplicados que chegam dentro do limite da marca d'água. As consultas também podem remover registros duplicados que chegam fora do limite, mas isso não é garantido. Para garantir que as consultas removam todos os duplicados, defina o limite da marca d'água para um valor maior que a diferença máxima de data e hora entre eventos duplicados.

Você precisa especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark :

Python
streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)

Exemplos de casos de uso

Os exemplos a seguir mostram casos de uso avançados de janelas:

Utilize janelas deslizantes para calcular o total de vendas por hora.

As janelas deslizantes têm tamanho fixo com intervalos não sobrepostos. Cada linha de entrada pertence a exatamente uma janela. Utilize janelas deslizantes para compute agregações de períodos de tempo discretos, como totais de vendas por hora:

Python
from pyspark.sql.functions import window, sum

hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)

Neste exemplo:

  • window("timestamp", "1 hour") Agrupa os pedidos em intervalos de 1 hora não sobrepostos, como das 5h às 6h e das 6h às 7h.
  • withWatermark("timestamp", "1 hour") Mantém o agregado de cada janela em estado inalterado até que o carimbo de data/hora de término da janela seja 1 hora anterior ao carimbo de data/hora máximo do pedido.

Utilize janelas deslizantes para calcular agregados rolantes.

As janelas de correr têm tamanho fixo com intervalos que podem se sobrepor. Uma única linha pode pertencer a várias janelas. Utilize janelas deslizantes para compute agregados móveis, como vendas em um período móvel de 6 horas:

Python
from pyspark.sql.functions import window, sum

rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)

Neste exemplo:

  • window("timestamp", "6 hours", slideDuration="1 hour") Agrupa os pedidos em intervalos de 6 horas que avançam em 1 hora, por exemplo, das 5h às 11h e das 6h às 12h.
  • withWatermark("timestamp", "1 hour") Mantém o agregado de cada janela em estado inalterado até que o carimbo de data/hora de término da janela seja 1 hora anterior ao carimbo de data/hora máximo do pedido.
  • slideDuration deve ser menor ou igual a windowDuration.

Utilize janelas de sessão para verificar a atividade do usuário.

As janelas de sessão não têm tamanho fixo. Uma janela se abre quando uma linha chega e se fecha após um intervalo de tempo que não contém novas linhas. Utilize janelas de sessão para agregar picos de atividade entre longos períodos de inatividade, como a visualização de uma página por um usuário em um período de 30 minutos:

Python
from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)

Neste exemplo:

  • session_window("timestamp", gapDuration="30 minutes") Abre uma janela quando a primeira view de página chega. Cada view de página subsequente que ocorrer dentro de 30 minutos estenderá a janela. Quando nenhuma view de página ocorre em 30 minutos, a janela se fecha e a próxima view de página inicia em uma nova janela.
  • withWatermark("timestamp", "1 hour") Mantém o estado agregado de cada sessão até que o carimbo de data/hora de término da janela seja 1 hora anterior ao carimbo de data/hora view máxima da página.
  • O argumento timeColumn para window() e session_window() deve ser de TimestampType ou TimestampNTZType.
  • Use current_timestamp() para definir janelas com base no tempo de processamento em vez do tempo do evento.
  • Você pode definir a duração das janelas de tempo, desde microssegundos até dias. Duração de um mês ou mais não é suportada.
  • Use o modo de saída complete com agregações em janelas para manter todo o estado da janela indefinidamente. Use o modo de saída append com uma marca d'água apropriada para limitar o crescimento do estado e evitar problemas de memória para grandes conjuntos de dados. Para obter mais detalhes sobre o comportamento do modo de saída, consulte Marcas d'água e modo de saída para agregações em janela.