Selecionar um modo de saída para transmissão estruturada

Este artigo discute a seleção de um modo de saída para transmissão com estado. Somente as transmissões com estado que contêm agregações exigem uma configuração de modo de saída.

O join suporta apenas o modo de saída append, e o modo de saída não afeta a deduplicação. Os operadores arbitrários com estado mapGroupsWithState e flatMapGroupsWithState emitem registros usando sua própria lógica personalizada, de modo que o modo de saída da transmissão não afeta seu comportamento.

Para a transmissão sem estado, todos os modos de saída se comportam da mesma forma.

Para configurar corretamente o modo de saída, o senhor deve entender a transmissão com estado, as marcas d'água e os acionadores. Consulte os artigos a seguir:

O que é o modo de saída?

O modo de saída de uma consulta de transmissão estruturada determina quais registros os operadores da consulta emitem durante cada acionamento. Os três tipos de registros que podem ser emitidos são:

  • Registros que não alteram o processamento futuro.

  • Os registros que foram alterados desde o último acionador.

  • Todos os registros na tabela de estados.

Saber quais tipos de registros devem ser emitidos é importante para os operadores com estado, pois uma determinada linha produzida por um operador com estado pode mudar de acionador para acionador. Por exemplo, como um operador de agregação de transmissão recebe mais linhas para uma determinada janela, os valores de agregação dessa janela podem mudar entre os acionadores.

Para operadores sem estado, a distinção entre os tipos de registro não afeta o comportamento do operador. Os registros que um operador stateless emite durante um acionador são sempre os registros de origem processados durante esse acionador.

Modos de saída disponíveis

Há três modos de saída que informam ao operador quais registros devem ser emitidos durante um determinado acionamento:

Saída Mode

Descrição

Modo de acréscimo (default)

Em default, a transmissão consulta a execução no modo append. Nesse modo, os operadores emitem apenas linhas que não serão alteradas em acionadores futuros. Os operadores com estado usam a marca d'água para determinar quando isso acontece.

Modo de atualização

No modo de atualização, os operadores emitem todas as linhas que foram alteradas durante o acionador, mesmo que o registro emitido possa ser alterado em um acionador subsequente.

Modo completo

O modo completo só funciona com agregações de transmissão. No modo completo, todas as linhas resultantes já produzidas pelo operador são emitidas a jusante.

Considerações sobre a produção

Para muitas operações de transmissão com estado, o senhor deve escolher entre os modos append e update. As seções a seguir descrevem as considerações que podem informar sua decisão.

Observação

O modo completo tem algumas aplicações, mas pode ter um desempenho ruim como escala de dados. Databricks recomenda o uso da visualização materializada para obter garantias semânticas associadas ao modo completo com processamento incremental para muitas operações com estado. Consulte Usar a visualização materializada em Databricks SQL.

Semântica do aplicativo

A semântica do aplicativo descreve como os aplicativos downstream usam os dados de transmissão.

Se o serviço downstream precisar executar uma única ação para cada gravação downstream, use o modo append na maioria dos casos. Por exemplo, se o senhor tiver um serviço de notificação downstream que envia notificações para cada novo registro gravado no sink, o modo append garante que cada registro seja gravado apenas uma vez. O modo de atualização grava o registro sempre que as informações do estado são alteradas, o que resultaria em várias atualizações.

Se o serviço downstream precisar de novos resultados, o modo de atualização garante que seu coletor permaneça o mais atualizado possível. Os exemplos incluem um modelo do machine learning que lê recurso em tempo real ou um painel analítico que acompanha agregados em tempo real.

Compatibilidade do operador e da pia

A transmissão estruturada não é compatível com todas as operações disponíveis no site Apache Spark, e algumas operações de transmissão não são compatíveis com todos os modos de saída. Para obter mais informações sobre as limitações do operador, consulte os documentos de transmissão do OSS.

Nem todos os dissipadores suportam todos os modos de saída. Tanto o Delta Lake, que faz o backup de todas as tabelas gerenciais do Unity Catalog, quanto o Kafka suportam todos os modos de saída. Para obter mais informações sobre a compatibilidade do sink, consulte os documentos de transmissão do OSS.

Latência e custo

O modo de saída afeta o tempo que deve transcorrer antes de gravar um registro, e a frequência e a quantidade de dados gravados podem afetar os custos associados ao pipeline de transmissão.

O modo Append força os operadores com estado a emitir resultados somente depois que os resultados com estado são finalizados, o que é pelo menos tão longo quanto o atraso da marca d'água. Um atraso de marca d'água de 1 hour no modo de saída de anexação significa que seus registros têm um atraso de pelo menos 1 hora antes de serem emitidos no sentido descendente.

O modo de atualização resulta em uma gravação por acionador e por valor agregado. Se o seu coletor cobrar por gravação por registro, isso pode ser caro se os registros forem atualizados muitas vezes antes que o atraso da marca d'água passe.

Exemplos de configuração

Os exemplos de código a seguir mostram a configuração do modo de saída para atualizações de transmissão nas tabelas do site Unity Catalog:

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)
// Append output mode (default)
df.writeStream
  .toTable("target_table")


// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Consulte os documentos do OSS para PySpark DataStreamWriter.outputMode ou Scala DataStreamWriter.outputMode.

Exemplo de modos de transmissão e saída stateful

O exemplo a seguir foi criado para ajudar o senhor a entender como o modo de saída interage com as marcas d'água para transmissão com estado.

Considere uma agregação de transmissão que calcula a receita total gerada a cada hora em uma loja com um atraso de marca d'água de 15 minutos. O primeiro microbatch processa os seguintes registros:

  • $15 às 2:40pm

  • $10 às 2:30pm

  • $30 às 3:10pm

Nesse ponto, a marca d'água do mecanismo é 2:55pm porque ele subtrai 15 minutos (o atraso) do tempo máximo visto (3:10pm). O operador de agregação de transmissão tem o seguinte em seu estado:

  • [2pm, 3pm]: $25

  • [3pm, 4pm]: $30

A tabela a seguir descreve o que aconteceria em cada modo de saída:

Modo de saída

Resultado e motivo

Acrescentar

O operador de agregação de transmissão não emite nada a jusante. Isso ocorre porque ambas as janelas podem mudar à medida que novos valores aparecem com um acionador subsequente: a marca d'água das 14h55 indica que os registros após as 14h55 ainda podem chegar, e esses registros podem cair na janela [2pm, 3pm] ou na janela [3pm, 4pm].

Atualizar

O operador emite os dois registros, pois ambos receberam atualizações.

Concluído

O operador emite todos os registros.

Agora, suponha que a transmissão receba mais um registro:

  • $20 às 3:20pm

A marca d'água é atualizada para 3:05pm porque o mecanismo subtrai 15 minutos de 3:20pm. Nesse ponto, o operador de agregação de transmissão tem o seguinte em seu estado:

  • [2pm, 3pm]: $25

  • [3pm, 4pm]: $50

A tabela a seguir descreve o que aconteceria em cada modo de saída:

Modo de saída

Resultado e motivo

Acrescentar

O operador de agregação de transmissão observa que a marca d'água das 15h05 é maior do que o final da janela [2pm, 3pm]. Pela definição da marca d'água, essa janela não pode mais mudar, portanto, ela emite a janela [2pm, 3pm].

Atualizar

O operador de agregação de transmissão emite a janela [3pm, 4pm] porque o valor do estado foi alterado de US$ 30 para US$ 50.

Concluído

O operador emite todos os registros.

A seguir, um resumo de como os operadores com estado se comportam em cada modo de anexação:

  • No modo de anexação, grave os registros uma vez após o atraso da marca d'água.

  • No modo de atualização, escreva os registros que foram alterados desde o acionador anterior.

  • No modo completo, escreva todos os registros já produzidos pelo operador com estado.