Pular para o conteúdo principal

O que é transmissão stateful?

Esta página explica consultas estruturadas de transmissão com estado, incluindo operações com estado, recomendações de otimização, encadeamento de múltiplos operadores com estado e rebalanceamento de estado.

Uma consulta de transmissão estruturada com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de transmissão estruturada sem estado apenas rastreia informações sobre quais linhas foram processadas da origem até o coletor. Para recursos de otimização disponíveis para consultas sem estado, consulte Otimizar consultas de transmissão sem estado.

operações com estado

As operações com estado incluem agregação de transmissão, distinct, dropDuplicates, junção de transmissão-transmissão e aplicativos com estado personalizados.

As informações de estado intermediárias necessárias para consultas de transmissão estruturada com estado podem levar a latência inesperada e problemas de produção se forem mal configuradas.

No Databricks Runtime 13.3 LTS ou posterior, você pode habilitar o checkpoint de changelog com RocksDB para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda habilitar o checkpoint de changelog para todas as consultas com estado do Transmission estructurada. Consulte Ativar o checkpoint do changelog.

Otimize consultas stateful de transmissão estruturada

Databricks recomenda o seguinte para consultas de transmissão estruturada com estado:

  • Usar as instâncias otimizadas do computecomo trabalhador.
  • Defina o número de partições embaralhadas para 1-2 vezes o número de núcleos no clustering.
  • Defina a configuração spark.sql.streaming.noDataMicroBatches.enabled para false na SparkSession. Isso impede que o mecanismo de microlotes de transmissão processe microlotes que não contenham dados. Definir esta configuração para false também pode resultar em operações com estado que usam marcas d'água ou tempos limite de processamento não recebendo saída de dados até que novos dados cheguem em vez de imediatamente.

Databricks recomenda o uso do site RocksDB com checkpointing de changelog para gerenciar o estado para transmissão com estado. Consulte Configurar RocksDB armazenamento do estado em Databricks.

nota

O esquema de gerenciamento de estado não pode ser alterado entre reinicializações de consulta. Se uma consulta foi iniciada com o gerenciamento default , você deve reiniciá-la do zero com um novo local de ponto de verificação para alterar o armazenamento do estado.

Trabalhar com múltiplos operadores com estado em estrutura de linha

No Databricks Runtime 13.3 LTS ou posterior, Databricks oferece suporte avançado para operadores com estado em cargas de trabalho de transmissão estruturada. Você pode encadear vários operadores com estado, o que significa que você pode alimentar a saída de uma operação, como uma agregação em janela, para outra operação com estado, como uma join.

No Databricks Runtime 16.2 ou posterior, você pode usar transformWithState em cargas de trabalho com vários operadores com estado. Consulte Criar uma aplicação com estado personalizada.

Os exemplos a seguir demonstram vários padrões que você pode usar.

importante

As seguintes limitações existem ao trabalhar com vários operadores com estado:

  • Operadores stateful personalizados legados (FlatMapGroupWithState e applyInPandasWithState) não são suportados.
  • Somente o modo de acréscimo de saída é suportado.

Agregação de janelas de tempo em cadeia

Python
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()

Agregação de janela de tempo em duas transmissões diferentes seguida de janela de transmissão-transmissão join

Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

transmissão-transmissão intervalo de tempo join seguido de agregação de janela de tempo

Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()

Reequilíbrio estatal para transmissão estruturada

O rebalanceamento de estado está habilitado por default para todas as cargas de trabalho de transmissão no pipeline declarativo LakeFlow Spark . No Databricks Runtime 11.3 LTS ou posterior, você pode definir a seguinte opção de configuração no cluster Spark para habilitar o rebalanceamento de estado:

ini
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O rebalanceamento de estado beneficia pipelines com estado que passam por eventos de redimensionamento cluster . As transmissões apátridas não se beneficiam, independentemente da mudança no tamanho cluster .

nota

O dimensionamento automático de computação tem limitações na redução do tamanho cluster para cargas de trabalho de transmissão estruturada. Databricks recomenda o uso do pipeline declarativo LakeFlow Spark com escalonamento automático aprimorado para cargas de trabalho de transmissão. Consulte Otimizar a utilização cluster do pipeline declarativo LakeFlow Spark com escalonamento automático.

Os eventos de redimensionamento de clustering acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante os eventos de rebalanceamento, pois o estado é carregado do armazenamento em nuvem para o novo executor.