Execute sua primeira carga de trabalho de Transmissão estruturada

Este artigo apresenta exemplos de código e explicação dos conceitos básicos necessários para executar suas primeiras consultas de Transmissão estruturada no Databricks. Você pode usar a Transmissão Estruturada para cargas de trabalho de processamento quase em tempo real e incrementais.

A Transmissão Estruturada é uma das várias tecnologias que alimentam as tabelas de transmissão no Delta Live Tables. A Databricks recomenda o uso de Delta Live Tables para todas as novas cargas de trabalho de ETL, ingestão e Transmissão estruturada. Consulte O que é Delta Live Tables?.

Observação

Embora as Delta Live Tables apresentem sintaxe ligeiramente modificada para declarar tabelas de transmissão, a sintaxe geral para configurar leituras e transformações de transmissão se aplica a todos os casos de uso de transmissão em Databricks. Delta Live Tables também simplifica a transmissão gerenciando informações de estado, metadados e inúmeras configurações.

Use o Auto Loader para ler dados de transmissão do armazenamento de objetos

O exemplo a seguir demonstra o carregamento de dados JSON com o Auto Loader, que utiliza cloudFiles para denotar formato e opções. A opção schemaLocation habilita a inferência e a evolução do esquema. Cole o seguinte código em uma célula do notebook do Databricks e execute a célula para criar um DataFrame de stream denominado raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Tal como outras operações de leitura em Databricks, a configuração de uma leitura de transmissão, na verdade, não carrega dados. Você deve acionar uma ação nos dados antes do início da transmissão.

Observação

Chamando display() em uma transmissão DataFrame começar a transmissão Job. Na maioria dos casos de uso de transmissão estruturada, a ação que aciona uma transmissão deve ser a gravação de dados em um sink. Consulte Considerações sobre produção para transmissão estruturada.

Executar uma transformação de transmissão

A Transmissão estruturada é compatível com a maioria das transformações disponíveis no Databricks e no Spark SQL. Você pode até mesmo carregar modelos MLflow como UDFs e fazer previsões de transmissão como uma transformação.

O exemplo de código a seguir completa uma transformação simples para enriquecer os dados JSON ingeridos com informações adicionais usando as funções do Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

O transformed_df resultante contém instruções de consulta para carregar e transformar cada registro à medida que ele chega à fonte de dados.

Observação

A Transmissão Estruturada trata as fontes de dados como conjuntos de dados ilimitados ou infinitos. Dessa forma, algumas transformações não são compatíveis com as cargas de trabalho de Transmissão Estruturada porque exigiriam a classificação de um número infinito de itens.

A maioria das agregações e muitas junções exigem o gerenciamento de informações de estado com marcas d'água, janelas e modo de saída. Consulte Aplicar marcas d'água para controlar os limites de processamento de dados.

Execute uma gravação em lote incremental no Delta Lake

O exemplo a seguir grava no Delta Lake com um caminho de arquivo especificado e um ponto de verificação.

Importante

É importante que você sempre defina um local de ponto de verificação exclusivo para cada gravador de transmissão configurada. O ponto de verificação fornece a identidade exclusiva da sua transmissão, rastreando todos os registros processados e as informações de estado associadas à sua consulta de transmissão.

A configuração availableNow do acionador instrui a Transmissão Estruturada a processar todos os registros não processados anteriormente do conjunto de dados de origem e, em seguida, desligar, para você poder executar com segurança o código a seguir sem se preocupar em deixar uma transmissão em execução:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Neste exemplo, nenhum novo registro chega em nossa fonte de dados, portanto a execução repetida desse código não ingere novos registros.

Aviso

A execução da Transmissão estruturada pode impedir que a rescisão automática desligue os recursos de computação. Para evitar custos inesperados, é importante que você encerre as consultas de transmissão.

Ler dados do Delta Lake, transformar e gravar no Delta Lake

Delta Lake tem amplo suporte para trabalhar com transmissão estruturada tanto como fonte quanto como sumidouro. Veja tabela Delta transmissão de leituras e escritas.

O exemplo a seguir mostra a sintaxe de exemplo para carregar de forma incremental todos os novos registros de uma tabela Delta, uni-los a um instantâneo de outra tabela Delta e gravá-los em uma tabela Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Você deve ter permissões adequadas configuradas para ler tabelas de origem e gravar em tabelas de destino e no local do ponto de verificação especificado. Preencha todos os parâmetros indicados entre colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores.

Observação

O Delta Live Tables tem sintaxe totalmente declarativa para a criação de pipelines do Delta Lake e gerencia propriedades como acionadores e pontos de verificação automaticamente. Consulte O que são Delta Live Tables?

Ler dados do Kafka, transformar e gravar no Kafka

O Apache Kafka e outros barramentos de mensagens oferecem parte da menor latência disponível para grandes conjuntos de dados. Você pode usar o Databricks para aplicar transformações aos dados ingeridos do Kafka e, em seguida, gravar os dados de volta no Kafka.

Observação

A gravação de dados no armazenamento de objetos em nuvem adiciona sobrecarga de latência. Se você quiser armazenar dados de um barramento de mensagens no Delta Lake, mas precisar da menor latência possível para cargas de trabalho de transmissão, a Databricks recomenda configurar trabalhos de transmissão separados para ingerir dados no lakehouse e aplicar transformações quase em tempo real para os coletores de barramento de mensagens downstream.

O exemplo de código a seguir demonstra um padrão simples para enriquecer os dados do Kafka, juntando-os aos dados em uma tabela Delta e, em seguida, retornando ao Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Você deve ter as permissões adequadas configuradas para acessar seu serviço do Kafka. Preencha todos os parâmetros indicados entre colchetes angulares (<>) usando os valores relevantes para suas fontes de dados e coletores. Consulte Processamento de stream com Apache Kafka e Databricks.