transmissão e ingestão incremental

Databricks usa a Apache Spark transmissão estruturada para dar suporte a vários produtos associados a cargas de trabalho de ingestão, inclusive:

  • Auto Loader

  • COPY INTO

  • Pipelines das Delta Live Tables

  • Visualização materializada e tabelas de transmissão em Databricks SQL

Este artigo discute algumas das diferenças entre a semântica de processamento de lotes de transmissão e incremental e fornece uma visão geral de alto nível da configuração de cargas de trabalho de ingestão para a semântica desejada em Databricks.

Qual é a diferença entre transmissão e ingestão de lotes incrementais?

As possíveis configurações de fluxo de trabalho de ingestão variam desde o processamento real em tempo próximo até o processamento de lotes incrementais infrequentes. Ambos os padrões usam a Apache Spark transmissão estruturada para alimentar o processamento incremental, mas têm semânticas diferentes. Para simplificar, este artigo se refere à ingestão real em tempo próximo como ingestão de transmissão e ao processamento incremental mais infrequente como ingestão de lotes incrementais.

transmissão ingestão

A transmissão, no contexto de ingestão de dados e atualizações de tabelas, refere-se ao processamento near data tempo-real em que o site Databricks ingere registros da fonte para o sink em microbatches usando uma infraestrutura sempre ativa. Uma carga de trabalho de transmissão ingere continuamente as atualizações da fonte de dados configurada, a menos que ocorra uma falha que interrompa a ingestão.

Ingestão incremental de lotes

A ingestão incremental de lotes refere-se a um padrão em que todos os novos registros são processados a partir de uma fonte de dados em um trabalho de curta duração. A ingestão incremental de lotes geralmente ocorre de acordo com um programa, mas também pode ser acionada manualmente ou com base na chegada de arquivos.

A ingestão incremental de lotes é diferente da ingestão de lotes, pois detecta automaticamente novos registros na fonte de dados e ignora os registros que já foram ingeridos.

Ingestão com Jobs

Databricks O Jobs permite que o senhor orquestre o fluxo de trabalho e programe tarefas que incluem o Notebook, a biblioteca, o pipeline Delta Live Tables e as consultas Databricks SQL.

Observação

O senhor pode usar todos os tipos de Databricks compute e de tarefa para configurar a ingestão de lotes incrementais. A ingestão de transmissão só é compatível com a produção no Job clássico compute e Delta Live Tables.

Os trabalhos têm dois modos principais de operação:

  • Os trabalhos contínuos são repetidos automaticamente se ocorrer uma falha. Esse modo é destinado à transmissão por ingestão.

  • Triggered Job execução tarefa quando acionado. Os gatilhos incluem:

    • Acionadores baseados em tempo que executam o Job em uma programação específica.

    • Acionadores baseados em arquivos que executam o Job quando os arquivos chegam a um local especificado.

    • Outros acionadores, como chamadas REST API , execução do comando Databricks CLI ou clicar no botão executar agora na interface do usuário workspace.

Para cargas de trabalho de lotes incrementais, configure seu Job usando o modo de acionamento AvailableNow, como segue:

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

Para cargas de trabalho de transmissão, o intervalo de acionamento do default é processingTime ="500ms". O exemplo a seguir mostra como processar um micro-lote a cada 5 segundos:

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Importante

O trabalho sem servidor não é compatível com Scala, modo contínuo ou intervalos de acionamento baseados em tempo para transmissão estruturada. Use o Job clássico se o senhor precisar de uma semântica de ingestão real quase em tempo real.

Ingestão com Delta Live Tables

Semelhante aos Jobs, o pipeline Delta Live Tables pode ser executado no modo acionado ou contínuo. Para obter a semântica de transmissão near tempo real com tabelas de transmissão, use o modo contínuo.

Use tabelas de transmissão para configurar a transmissão ou a ingestão de lotes incrementais do armazenamento de objetos cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.

LakeFlow O Connect usa o site Delta Live Tables para configurar o pipeline de ingestão dos sistemas conectados. Consulte LakeFlow Connect.

A visualização materializada garante uma semântica de operações equivalente a muitas cargas de trabalho, mas pode otimizar muitas operações para calcular os resultados de forma incremental. Consulte Incremental refresh para visualização materializada.

Ingestão com Databricks SQL

O senhor pode usar tabelas de transmissão para configurar a ingestão de lotes incrementais do armazenamento de objetos cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.

O senhor pode usar a visualização materializada para configurar o processamento incremental de lotes a partir de fontes Delta. Consulte Incremental refresh para visualização materializada.

COPY INTO fornece a sintaxe familiar do SQL para processamento incremental de lotes para arquivos de dados no armazenamento de objetos do cloud. O comportamento COPY INTO é semelhante aos padrões suportados pelas tabelas de transmissão para armazenamento de objetos cloud, mas nem todas as configurações default são equivalentes para todos os formatos de arquivos suportados.