Selecionar um modo de saída para transmissão estruturada
Este artigo discute a seleção de um modo de saída para transmissão com estado. Somente as transmissões com estado que contêm agregações exigem uma configuração de modo de saída.
O join suporta apenas o modo de saída append, e o modo de saída não afeta a deduplicação. Os operadores arbitrários com estado mapGroupsWithState
e flatMapGroupsWithState
emitem registros usando sua própria lógica personalizada, de modo que o modo de saída da transmissão não afeta seu comportamento.
Para a transmissão sem estado, todos os modos de saída se comportam da mesma forma.
Para configurar corretamente o modo de saída, o senhor deve compreender a transmissão com estado, as marcas d'água e os acionadores. Consulte os artigos a seguir:
- O que é transmissão stateful?
- Aplicar marcas d'água para controlar o limite de processamento de dados
- Configurar intervalos de acionamento da transmissão estruturada
O que é o modo de saída?
O modo de saída de uma consulta de transmissão estruturada determina quais registros os operadores da consulta emitem durante cada acionamento. Os três tipos de registros que podem ser emitidos são:
- Registra que o processamento futuro não muda.
- Os registros que foram alterados desde o último gatilho.
- Todos os registros na tabela de estados.
Saber quais tipos de registros emitir é importante para operadores com estado, pois uma linha específica produzida por um operador com estado pode mudar de gatilho para acionador. Por exemplo, como um operador de agregação de transmissão recebe mais linhas para uma determinada janela, os valores de agregação dessa janela podem mudar entre os acionadores.
Para operadores sem estado, a distinção entre os tipos de registro não afeta o comportamento do operador. Os registros que um operador sem estado emite durante um gatilho são sempre os registros de origem processados durante esse gatilho.
Modos de saída disponíveis
Há três modos de saída que informam a um operador quais registros devem ser emitidos durante um gatilho específico:
Saída Mode | Descrição |
---|---|
Modo de acréscimo (default) | Por default, a transmissão consulta a execução no modo append. Nesse modo, os operadores emitem somente linhas que não mudam em gatilhos futuros. Operadores com estado usam a marca d'água para determinar quando isso acontece. |
Modo de atualização | No modo de atualização, os operadores emitem todas as linhas que foram alteradas durante o gatilho, mesmo que o registro emitido possa mudar em um acionador subsequente. |
Modo completo | O modo completo só funciona com agregações de transmissão. No modo completo, todas as linhas resultantes já produzidas pelo operador são emitidas a jusante. |
Considerações de produção
Para muitas operações de transmissão com estado, o senhor deve escolher entre os modos append e update. As seções a seguir descrevem as considerações que podem informar sua decisão.
O modo completo tem algumas aplicações, mas pode ter um desempenho ruim como escala de dados. Databricks recomenda o uso da visualização materializada para obter garantias semânticas associadas ao modo completo com processamento incremental para muitas operações com estado. Consulte Usar a visualização materializada em Databricks SQL.
Semântica do aplicativo
A semântica do aplicativo descreve como os aplicativos downstream usam os dados de transmissão.
Se o serviço downstream precisar executar uma única ação para cada gravação downstream, use o modo append na maioria dos casos. Por exemplo, se o senhor tiver um serviço de notificação downstream que envia notificações para cada novo registro gravado no sink, o modo append garante que cada registro seja gravado apenas uma vez. O modo de atualização grava o registro sempre que as informações do estado são alteradas, o que resultaria em várias atualizações.
Se o serviço downstream precisar de novos resultados, o modo de atualização garante que seu coletor permaneça o mais atualizado possível. Os exemplos incluem um modelo de aprendizado de máquina que lê recursos em tempo real ou um painel analítico que acompanha agregados em tempo real.
Compatibilidade entre operador e coletor
A transmissão estruturada não é compatível com todas as operações disponíveis no site Apache Spark, e algumas operações de transmissão não são compatíveis com todos os modos de saída. Para obter mais informações sobre as limitações do operador, consulte os documentos de transmissão do OSS.
Nem todos os coletores suportam todos os modos de saída. Tanto o Delta Lake, que faz o backup de todas as tabelas gerenciais do Unity Catalog, quanto o Kafka suportam todos os modos de saída. Para obter mais informações sobre a compatibilidade do sink, consulte os documentos de transmissão do OSS.
Latência e custo
O modo de saída afeta o tempo que deve transcorrer antes de gravar um registro, e a frequência e a quantidade de dados gravados podem afetar os custos associados ao pipeline de transmissão.
O modo de acréscimo força os operadores com estado a emitirem resultados somente após a finalização dos resultados com estado, o que é pelo menos tão longo quanto o atraso da marca d'água. Um atraso de marca d'água de 1 hour
no modo de saída de acréscimo significa que seus registros têm um atraso de pelo menos 1 hora antes de serem emitidos a jusante.
O modo de atualização resulta em uma gravação por acionador por valor agregado. Se o coletor cobra por gravação por registro, isso pode ser caro se os registros forem atualizados várias vezes antes que o atraso da marca d'água passe.
Exemplos de configuração
Os exemplos de código a seguir mostram a configuração do modo de saída para atualizações de transmissão nas tabelas do site Unity Catalog:
- Python
- Scala
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
Consulte os documentos do OSS para PySpark DataStreamWriter.outputMode ou Scala DataStreamWriter.outputMode.
Exemplo de modos de transmissão e saída stateful
O exemplo a seguir tem o objetivo de ajudá-lo a entender como o modo de saída interage com as marcas d'água para a transmissão com estado.
Considere uma agregação de transmissão que calcula a receita total gerada a cada hora em uma loja com um atraso de marca d'água de 15 minutos. O primeiro microlote processa os seguintes registros:
- $15 às 14h40
- $10 às 14h30
- $30 às 15h10
Nesse ponto, a marca d'água do motor é 14h55 porque subtrai 15 minutos (o atraso) do tempo máximo visto (15h10). O operador de agregação de transmissão tem o seguinte em seu estado:
[2pm, 3pm]
: $25[3pm, 4pm]
: $30
A tabela a seguir descreve o que aconteceria em cada modo de saída:
Modo de saída | Resultado e motivo |
---|---|
Acrescentar | O operador de agregação de transmissão não emite nada a jusante. Isso ocorre porque essas duas janelas podem mudar à medida que novos valores aparecem com um gatilho subsequente: a marca d'água de 14h55 indica que os registros após as 14h55 ainda podem chegar, e esses registros podem cair na janela |
Atualizar | O operador emite os dois registros, pois ambos receberam atualizações. |
Concluído | O operador emite todos os registros. |
Agora, suponha que a transmissão receba mais um registro:
- $20 às 15h20
A marca d'água é atualizada para 15h05 porque o motor subtrai 15 minutos das 15h20. Nesse ponto, o operador de agregação de transmissão tem o seguinte em seu estado:
[2pm, 3pm]
: $25[3pm, 4pm]
: $50
A tabela a seguir descreve o que aconteceria em cada modo de saída:
Modo de saída | Resultado e motivo |
---|---|
Acrescentar | O operador de agregação de transmissão observa que a marca d'água das 15h05 é maior do que o final da janela |
Atualizar | O operador de agregação de transmissão emite a janela |
Concluído | O operador emite todos os registros. |
Veja a seguir um resumo de como os operadores com estado se comportam em cada modo de acréscimo:
- No modo de acréscimo, grave os registros uma vez após o atraso da marca d'água.
- No modo de atualização, grave os registros que foram alterados desde o gatilho anterior.
- No modo completo, grave todos os registros já produzidos pelo operador stateful.