transmissão e ingestão incremental
Databricks usa a Apache Spark transmissão estruturada para dar suporte a vários produtos associados a cargas de trabalho de ingestão, inclusive:
Auto Loader
COPY INTO
Pipelines das Delta Live Tables
Visualização materializada e tabelas de transmissão em Databricks SQL
Este artigo discute algumas das diferenças entre a semântica de processamento de lotes de transmissão e incremental e fornece uma visão geral de alto nível da configuração de cargas de trabalho de ingestão para a semântica desejada em Databricks.
Qual é a diferença entre transmissão e ingestão de lotes incrementais?
As possíveis configurações de fluxo de trabalho de ingestão variam desde o processamento real em tempo próximo até o processamento de lotes incrementais infrequentes. Ambos os padrões usam a Apache Spark transmissão estruturada para alimentar o processamento incremental, mas têm semânticas diferentes. Para simplificar, este artigo se refere à ingestão real em tempo próximo como ingestão de transmissão e ao processamento incremental mais infrequente como ingestão de lotes incrementais.
transmissão ingestão
A transmissão, no contexto de ingestão de dados e atualizações de tabelas, refere-se ao processamento near data tempo-real em que o site Databricks ingere registros da fonte para o sink em microbatches usando uma infraestrutura sempre ativa. Uma carga de trabalho de transmissão ingere continuamente as atualizações da fonte de dados configurada, a menos que ocorra uma falha que interrompa a ingestão.
Ingestão incremental de lotes
A ingestão incremental de lotes refere-se a um padrão em que todos os novos registros são processados a partir de uma fonte de dados em um trabalho de curta duração. A ingestão incremental de lotes geralmente ocorre de acordo com um programa, mas também pode ser acionada manualmente ou com base na chegada de arquivos.
A ingestão incremental de lotes é diferente da ingestão de lotes, pois detecta automaticamente novos registros na fonte de dados e ignora os registros que já foram ingeridos.
Ingestão com Jobs
Databricks O Jobs permite que o senhor orquestre o fluxo de trabalho e programe tarefas que incluem o Notebook, a biblioteca, o pipeline Delta Live Tables e as consultas Databricks SQL.
Observação
O senhor pode usar todos os tipos de Databricks compute e de tarefa para configurar a ingestão de lotes incrementais. A ingestão de transmissão só é compatível com a produção no Job clássico compute e Delta Live Tables.
Os trabalhos têm dois modos principais de operação:
Os trabalhos contínuos são repetidos automaticamente se ocorrer uma falha. Esse modo é destinado à transmissão por ingestão.
Triggered Job execução tarefa quando acionado. Os gatilhos incluem:
Acionadores baseados em tempo que executam o Job em uma programação específica.
Acionadores baseados em arquivos que executam o Job quando os arquivos chegam a um local especificado.
Outros acionadores, como chamadas REST API , execução do comando Databricks CLI ou clicar no botão executar agora na interface do usuário workspace.
Para cargas de trabalho de lotes incrementais, configure seu Job usando o modo de acionamento AvailableNow
, como segue:
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
Para cargas de trabalho de transmissão, o intervalo de acionamento do default é processingTime ="500ms"
. O exemplo a seguir mostra como processar um micro-lote a cada 5 segundos:
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Importante
O trabalho sem servidor não é compatível com Scala, modo contínuo ou intervalos de acionamento baseados em tempo para transmissão estruturada. Use o Job clássico se o senhor precisar de uma semântica de ingestão real quase em tempo real.
Ingestão com Delta Live Tables
Semelhante aos Jobs, o pipeline Delta Live Tables pode ser executado no modo acionado ou contínuo. Para obter a semântica de transmissão near tempo real com tabelas de transmissão, use o modo contínuo.
Use tabelas de transmissão para configurar a transmissão ou a ingestão de lotes incrementais do armazenamento de objetos cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.
LakeFlow O Connect usa o site Delta Live Tables para configurar o pipeline de ingestão dos sistemas conectados. Consulte LakeFlow Connect.
A visualização materializada garante uma semântica de operações equivalente a muitas cargas de trabalho, mas pode otimizar muitas operações para calcular os resultados de forma incremental. Consulte Incremental refresh para visualização materializada.
Ingestão com Databricks SQL
O senhor pode usar tabelas de transmissão para configurar a ingestão de lotes incrementais do armazenamento de objetos cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub ou Apache Pulsar.
O senhor pode usar a visualização materializada para configurar o processamento incremental de lotes a partir de fontes Delta. Consulte Incremental refresh para visualização materializada.
COPY INTO
fornece a sintaxe familiar do SQL para processamento incremental de lotes para arquivos de dados no armazenamento de objetos do cloud. O comportamento COPY INTO
é semelhante aos padrões suportados pelas tabelas de transmissão para armazenamento de objetos cloud, mas nem todas as configurações default são equivalentes para todos os formatos de arquivos suportados.