execução sua primeira transmissão carga de trabalho estruturada
Este artigo apresenta exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Transmissão estruturada no Databricks. Você pode usar a Transmissão Estruturada para cargas de trabalho de processamento quase em tempo real e incrementais.
A transmissão estruturada é uma das várias tecnologias que alimentam as tabelas de transmissão na DLT. Databricks recomenda o uso da DLT para todas as novas cargas de trabalho de ETL, ingestão e transmissão estruturada. Consulte O que é DLT? .
Embora a DLT forneça uma sintaxe ligeiramente modificada para declarar tabelas de transmissão, a sintaxe geral para configurar leituras e transformações de transmissão se aplica a todos os casos de uso de transmissão em Databricks. A DLT também simplifica a transmissão ao gerenciar informações de estado, metadados e várias configurações.
Use o site Auto Loader para ler os dados de transmissão do armazenamento de objetos
O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que utiliza cloudFiles
para denotar formato e opções. A opção schemaLocation
habilita a inferência e a evolução do esquema. Cole o seguinte código em uma célula do notebook do Databricks e execute a célula para criar um DataFrame de stream denominado raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Tal como outras operações de leitura em Databricks, a configuração de uma leitura de transmissão, na verdade, não carrega dados. Você deve acionar uma ação nos dados antes do início da transmissão.
Chamando display()
em uma transmissão DataFrame começar a transmissão Job. Na maioria dos casos de uso de transmissão estruturada, a ação que aciona uma transmissão deve ser a gravação de dados em um sink. Consulte Considerações sobre produção para transmissão estruturada.
Realizar a transmissão transformações
A Transmissão estruturada é compatível com a maioria das transformações disponíveis no Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de transmissão como uma transformação.
O exemplo de código a seguir completa uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando as funções do Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
O transformed_df
resultante contém instruções de consulta para carregar e transformar cada registro à medida que ele chega à fonte de dados.
A Transmissão Estruturada trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Dessa forma, algumas transformações não são compatíveis com as cargas de trabalho de Transmissão Estruturada porque exigiriam a classificação de um número infinito de itens.
A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar o limite de processamento de dados.
Executar uma gravação incremental de lotes para Delta Lake
O exemplo a seguir grava no Delta Lake com um caminho de arquivo especificado e um ponto de verificação.
É importante que você sempre defina um local de ponto de verificação exclusivo para cada gravador de transmissão configurada. O ponto de verificação fornece a identidade exclusiva da sua transmissão, rastreando todos os registros processados e as informações de estado associadas à sua consulta de transmissão.
A configuração availableNow
do acionador instrui a Transmissão Estruturada a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para você poder executar com segurança o código a seguir sem se preocupar em deixar uma transmissão em execução:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto a execução repetida desse código não ingere novos registros.
A execução da Transmissão estruturada pode impedir que a rescisão automática desligue os recursos de computação. Para evitar custos inesperados, é importante que você encerre as consultas de transmissão.
Ler dados do Delta Lake, transformar e gravar no Delta Lake
Delta Lake tem amplo suporte para trabalhar com a transmissão estruturada como fonte e sumidouro. Consulte Delta tabela de leituras e gravações de transmissão.
O exemplo a seguir mostra a sintaxe de exemplo para carregar de forma incremental todos os novos registros de uma tabela Delta, uni-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local do ponto de verificação especificado. Preencha todos os parâmetros indicados entre colchetes angulares (<>
) usando os valores relevantes para suas fontes de dados e coletores.
O DLT oferece uma sintaxe totalmente declarativa para criar o pipeline Delta Lake e gerenciar propriedades como acionadores e pontos de verificação automaticamente. Consulte O que é DLT? .
Ler dados do Kafka, transformar e gravar no Kafka
O Apache Kafka e outros barramentos de mensagens oferecem parte da menor latência disponível para grandes conjuntos de dados. Você pode usar o Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar os dados de volta no Kafka.
A gravação de dados no armazenamento de objetos em nuvem adiciona sobrecarga de latência. Se você quiser armazenar dados de um barramento de mensagens no Delta Lake, mas precisar da menor latência possível para cargas de trabalho de transmissão, a Databricks recomenda configurar trabalhos de transmissão separados para ingerir dados no lakehouse e aplicar transformações quase em tempo real para os coletores de barramento de mensagens downstream.
O exemplo de código a seguir demonstra um padrão simples para enriquecer os dados do Kafka, juntando-os aos dados em uma tabela Delta e, em seguida, retornando ao Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
O senhor deve ter as permissões adequadas configuradas para acessar o serviço Kafka. Preencha todos os parâmetros indicados com colchetes angulares (<>
) usando os valores relevantes para sua fonte de dados e coletores. Veja o processamento da transmissão com Apache Kafka e Databricks.