transmissão e ingestão incremental
Databricks usa a Apache Spark transmissão estruturada para dar suporte a vários produtos associados a cargas de trabalho de ingestão, inclusive:
- Carregador automático
COPY INTO
- Pipelines DLT
- Visualização materializada e tabelas de transmissão em Databricks SQL
Este artigo discute algumas das diferenças entre as semânticas de processamento de lotes de transmissão e incremental e fornece uma visão geral de alto nível da configuração de cargas de trabalho de ingestão para a semântica desejada em Databricks.
Qual é a diferença entre transmissão e ingestão de lotes incrementais?
As possíveis configurações de fluxo de trabalho de ingestão variam desde o processamento real em tempo próximo até o processamento de lotes incrementais infrequentes. Ambos os padrões usam a Apache Spark transmissão estruturada para alimentar o processamento incremental, mas têm semânticas diferentes. Para simplificar, este artigo se refere à ingestão real em tempo próximo como ingestão de transmissão e ao processamento incremental mais infrequente como ingestão de lotes incrementais .
transmissão ingestão
A transmissão, no contexto de ingestão de dados e atualizações de tabelas, refere-se ao processamento near data tempo-real em que o site Databricks ingere registros da fonte para o sink em microbatches usando uma infraestrutura sempre ativa. Uma carga de trabalho de transmissão ingere continuamente as atualizações da fonte de dados configurada, a menos que ocorra uma falha que interrompa a ingestão.
Ingestão incremental de lotes
A ingestão incremental de lotes refere-se a um padrão em que todos os novos registros são processados a partir de uma fonte de dados em um trabalho de curta duração. A ingestão incremental de lotes geralmente ocorre de acordo com um programa, mas também pode ser acionada manualmente ou com base na chegada de arquivos.
A ingestão incremental de lotes é diferente da ingestão de lotes , pois detecta automaticamente novos registros na fonte de dados e ignora os registros que já foram ingeridos.
Ingestão com Jobs
Databricks O Jobs permite que o senhor orquestre o fluxo de trabalho e programe tarefas que incluem o Notebook, a biblioteca, o pipeline DLT e as consultas Databricks SQL.
O senhor pode usar todos os tipos de Databricks compute e de tarefa para configurar a ingestão de lotes incrementais. A ingestão de transmissão só é compatível com a produção no Job compute clássico e no DLT.
Os trabalhos têm dois modos principais de operação:
- Os trabalhos contínuos são repetidos automaticamente se ocorrer uma falha. Esse modo é destinado à transmissão por ingestão.
- Triggered Job execução tarefa quando acionado. Os gatilhos incluem:
- Acionadores baseados em tempo que executam o Job em uma programação específica.
- Acionadores baseados em arquivos que executam o Job quando os arquivos chegam a um local especificado.
- Outros acionadores, como REST API chamadas, execução de Databricks CLI comando ou clicar no botão executar agora na interface do usuário workspace.
Para cargas de trabalho de lotes incrementais, configure seu Job usando o modo de acionamento AvailableNow
, como segue:
- Python
- Scala
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
Para cargas de trabalho de transmissão, o intervalo de acionamento do default é processingTime ="500ms"
. O exemplo a seguir mostra como processar um micro-lote a cada 5 segundos:
- Python
- Scala
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
O trabalho sem servidor não é compatível com Scala, modo contínuo ou intervalos de acionamento baseados em tempo para transmissão estruturada. Use o Job clássico se o senhor precisar de uma semântica de ingestão real quase em tempo real.
Ingestão com DLT
Semelhante aos Jobs, o pipeline DLT pode ser executado no modo acionado ou contínuo. Para obter a semântica de transmissão near tempo real com tabelas de transmissão, use o modo contínuo.
Use tabelas de transmissão para configurar a transmissão ou a ingestão de lotes incrementais do armazenamento de objetos na nuvem, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.
LakeFlow Connect usa DLT para configurar o pipeline de ingestão de sistemas conectados. Veja LakeFlow Connect.
A visualização materializada garante uma semântica de operações equivalente a muitas cargas de trabalho, mas pode otimizar muitas operações para calcular os resultados de forma incremental. Consulte Incremental refresh para visualização materializada.
Ingestão com Databricks SQL
O senhor pode usar tabelas de transmissão para configurar a ingestão de lotes incrementais do armazenamento de objetos na nuvem, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.
O senhor pode usar a visualização materializada para configurar o processamento incremental de lotes a partir de fontes Delta. Consulte Incremental refresh para visualização materializada.
COPY INTO
fornece a sintaxe familiar do SQL para processamento incremental de lotes para arquivos de dados no armazenamento de objetos na nuvem. COPY INTO
é semelhante aos padrões suportados pelas tabelas de transmissão para armazenamento de objetos na nuvem, mas nem todas as configurações de default são equivalentes para todos os formatos de arquivo suportados.