Use sinks para transmitir registros para serviços externos com DLT
Visualização
A API DLT sink
está em pré-visualização pública.
Este artigo descreve o DLT sink
API e como usá-lo com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os coletores de dados externos incluem Unity Catalog gerenciar e tabelas externas, além de serviços de transmissão de eventos, como Apache Kafka ou Azure Event Hubs.
A API do DLT sink
está disponível apenas para Python.
O que são coletores DLT?
Os sinks de DLT são alvos para fluxos de DLT. Em default DLT os fluxos emitem dados para uma tabela de transmissão ou para um destino materializado view. Ambas são tabelas Databricks gerenciar Delta. DLT Os sinks são um destino alternativo que o senhor usa para gravar dados transformados em destinos como o serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, e tabelas externas gerenciadas por Unity Catalog. Usando sinks, o senhor agora tem mais opções para manter a saída do pipeline DLT.
Quando devo usar coletores DLT?
A Databricks recomenda o uso de dissipadores DLT se o senhor precisar:
- Desenvolva um caso de uso operacional, como detecção de fraude, análise de tempo real e recomendações de clientes. Os casos de uso operacional normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e depois processam os dados com baixa latência e gravam os registros processados de volta em um barramento de mensagens. Essa abordagem permite que você obtenha menor latência ao não escrever ou ler no armazenamento em nuvem.
- Gravar dados transformados de seus fluxos DLT em tabelas gerenciadas por uma instância externa Delta, incluindo Unity Catalog gerenciar e tabelas externas.
- Execute o processo reverso extrair, transformar, carregar (ETL) em coletores externos a Databricks, como os tópicos Apache Kafka . Essa abordagem permite que o senhor ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas Unity Catalog ou de outro armazenamento Databricks-gerenciar.
Como faço para usar coletores DLT?
À medida que os dados do evento são ingeridos de uma fonte de transmissão para o seu site DLT pipeline, o senhor processa e refina essa funcionalidade de uso de dados DLT e, em seguida, usa o processamento de fluxo de acréscimo para transmitir os registros de dados transformados para um sink DLT. Você cria esse coletor usando a função create_sink()
. Para obter mais detalhes sobre a função create_sink
, consulte a referência da API do sink.
Se o senhor tem um DLT pipeline que cria ou processa os dados do evento de transmissão e prepara os registros de dados para gravação, então está pronto para usar um sink DLT.
A implementação de um sumidouro DLT consiste em duas etapas:
- Criar o coletor DLT.
- Use um fluxo de acréscimo para gravar os registros preparados no coletor.
Criar um coletor DLT
Databricks suporta três tipos de sumidouros de destino nos quais o usuário grava os registros processados a partir dos dados de transmissão:
- Delta pias de mesa (incluindo Unity Catalog gerenciar e mesas externas)
- Sumidouros do Apache Kafka
- Sumidouros dos Hubs de Eventos do Azure
abaixo são exemplos de configurações para Delta, Kafka e Azure Hubs de eventos:
- Delta sinks
- Kafka and Azure Event Hubs sinks
To create a Delta sink by file path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
To create a Delta sink by table name using a fully qualified catalog and schema path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
This code works for both Apache Kafka and Azure Event Hubs sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Para obter mais detalhes sobre o uso da função create_sink
, consulte a referência da API do sink.
Após a criação do sink, o senhor pode começar a transmitir os registros processados para o sink.
Escreva em um coletor DLT com um fluxo de acréscimo
Com seu coletor criado, a próxima etapa é gravar registros processados nele especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Você faz isso especificando seu coletor como o valor target
no decorador append_flow
.
- Para Unity Catalog gerenciar e tabelas externas, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Seu pipeline DLT deve ser configurado para usar Unity Catalog. - Para os tópicos do Apache Kafka , use o formato
kafka
e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que a Spark transmissão estructurada Kafka suporta para a pia. Consulte Configurar o gravador Kafka transmissão estructurada. - Para Azure Event Hubs, use o formato
kafka
e especifique o nome do Event Hubs, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um sink de Hubs de Eventos de transmissão estruturada Spark que usa a interface Kafka. Veja a autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.
Abaixo estão exemplos de como configurar fluxos para gravar em Delta, Kafka e Azure Event Hubs sinks com registros processados pelo seu DLT pipeline.
- Delta sink
- Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
The value
parameter is mandatory for an Azure Event Hubs sink. Additional parameters such as key
, partition
, headers
, and topic
are optional.
Para obter mais detalhes sobre o decorador append_flow
, consulte Como usar vários fluxos para gravar em um único destino.
Limitações
-
Somente a API Python é compatível. Não há suporte para SQL.
-
Somente as consultas de transmissão são suportadas. não há suporte para consultas de lotes.
-
Somente
append_flow
pode ser usado para gravar em coletores. Outros fluxos, comoapply_changes
, não são compatíveis e o senhor não pode usar um sink em uma definição de DLT dataset. Por exemplo, o seguinte não é suportado:Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Para Delta sinks, o nome da tabela deve ser totalmente qualificado. Especificamente, para Unity Catalog gerenciar tabelas externas, o nome da tabela deve ter o formato
<catalog>.<schema>.<table>
. Para o Hive metastore, ele deve estar no formato<schema>.<table>
. -
A execução de uma atualização completa do site refresh não limpa os dados de resultados de computação anteriores nos sinks. Isso significa que todos os dados reprocessados são anexados ao coletor e os dados existentes não são alterados.
-
As expectativas de DLT não são suportadas.