Pular para o conteúdo principal

O que é transmissão stateful?

Uma consulta de transmissão estruturada com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de transmissão estruturada sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o destino.

As operações stateful incluem agregação de transmissão, transmissão dropDuplicates, união de transmissão-transmissão e aplicativos stateful personalizados.

As informações de estado intermediárias necessárias para consultas de transmissão estruturada com estado podem levar a latência inesperada e problemas de produção se forem mal configuradas.

Em Databricks Runtime 13.3 LTS e acima, o senhor pode ativar o checkpointing de changelog com RocksDB para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda habilitar o checkpointing do changelog para todas as consultas stateful de transmissão estruturada. Consulte Habilitar o checkpoint do registro de alterações.

Otimizar as consultas de transmissão estruturada com estado

O gerenciamento das informações de estado intermediário das consultas de transmissão estruturada com estado pode ajudar a evitar latência inesperada e problemas de produção.

A Databricks recomenda:

  • Usar as instâncias otimizadas do computecomo trabalhador.
  • Defina o número de partições embaralhadas para 1-2 vezes o número de núcleos no clustering.
  • Defina a configuração spark.sql.streaming.noDataMicroBatches.enabled como false na SparkSession. Isso impede que o mecanismo de transmissão de microlotes processe microlotes que não contenham dados. Observe também que definir essa configuração como false pode resultar em operações com estado que usam marcas d'água ou tempos limite de processamento para não obter saída de dados até que novos dados cheguem, em vez de imediatamente.

Databricks recomenda o uso do site RocksDB com checkpointing de changelog para gerenciar o estado para transmissão com estado. Consulte Configurar RocksDB armazenamento do estado em Databricks.

nota

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações da consulta. Se uma consulta tiver sido iniciada com o gerenciamento default, o senhor deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o armazenamento do estado.

Trabalhar com vários operadores com estado na transmissão estruturada

Em Databricks Runtime 13.3 LTS e acima, Databricks oferece suporte avançado para operadores stateful em cargas de trabalho de transmissão estruturada. Agora é possível encadear vários operadores com estado, o que significa que o senhor pode alimentar a saída de uma operação, como uma agregação com janela, para outra operação com estado, como um join.

Em Databricks Runtime 16.2 e acima, o senhor pode usar transformWithState em cargas de trabalho com vários operadores stateful. Consulte Criar um aplicativo personalizado com estado.

Os exemplos a seguir demonstram vários padrões que você pode usar.

important

As seguintes limitações existem ao trabalhar com vários operadores com estado:

  • Operadores de estado personalizados legados (FlatMapGroupWithState e applyInPandasWithState) não são suportados.
  • Somente o modo de acréscimo de saída é suportado.

Agregação de janelas de tempo em cadeia

Python
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()

Agregação de janela de tempo em duas transmissões diferentes seguida de janela de transmissão-transmissão join

Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

transmissão-transmissão intervalo de tempo join seguido de agregação de janela de tempo

Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()

Reequilíbrio estatal para transmissão estruturada

O reequilíbrio de estado é ativado pelo site default para todas as cargas de trabalho de transmissão na DLT. Em Databricks Runtime 11.3 LTS e acima, o senhor pode definir a seguinte opção de configuração na configuração de clustering Spark para ativar o rebalanceamento de estado:

ini
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O rebalanceamento de estado beneficia o pipeline de transmissão estruturada com estado que passa por eventos de redimensionamento de cluster. As operações de transmissão sem estado não se beneficiam, independentemente da alteração dos tamanhos dos clusters.

nota

O dimensionamento automático da computação tem limitações na redução do tamanho do clustering para cargas de trabalho de transmissão estruturada. Databricks recomenda o uso de DLT com autoescala aprimorada para cargas de trabalho de transmissão. Consulte Otimizar a utilização de clustering do pipeline DLT com autoescala aprimorada.

Os eventos de redimensionamento de clustering acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante os eventos de rebalanceamento, pois o estado é carregado do armazenamento em nuvem para o novo executor.