Use sinks para transmitir registros para um serviço externo com LakeFlow Declarative pipeline
Visualização
O pipeline declarativo LakeFlow sink
API está em visualização pública.
Este artigo descreve o LakeFlow Declarative pipeline sink
API e como usá-lo com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os coletores de dados externos incluem Unity Catalog gerenciar e tabelas externas, além de serviços de transmissão de eventos, como Apache Kafka ou Azure Event Hubs.
O pipeline declarativo LakeFlow sink
API está disponível somente para Python.
O que são LakeFlow Declarative pipeline sinks?
LakeFlow Os sinks do pipeline declarativo são alvos para os fluxos do pipeline declarativo LakeFlow. Por default LakeFlow Os fluxos de pipeline declarativos emitem dados para uma tabela de transmissão ou para um destino materializado view. Ambas são tabelas Databricks gerenciar Delta. LakeFlow Os sinks de pipeline declarativo são um destino alternativo que o senhor usa para gravar dados transformados em destinos como o serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, e tabelas externas gerenciadas por Unity Catalog. Com o uso de sinks, o senhor agora tem mais opções para manter a saída do pipeline declarativo LakeFlow.
Quando devo usar o LakeFlow Declarative pipeline sinks?
Databricks recomenda o uso do LakeFlow Declarative pipeline sinks se o senhor precisar:
- Desenvolva um caso de uso operacional, como detecção de fraude, análise de tempo real e recomendações de clientes. Os casos de uso operacional normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e depois processam os dados com baixa latência e gravam os registros processados de volta em um barramento de mensagens. Essa abordagem permite que você obtenha menor latência ao não escrever ou ler no armazenamento em nuvem.
- Escreva dados transformados de seus fluxos de pipeline declarativo LakeFlow em tabelas gerenciadas por uma instância externa Delta, incluindo Unity Catalog gerenciar e tabelas externas.
- Execute o processo reverso extrair, transformar, carregar (ETL) em coletores externos a Databricks, como os tópicos Apache Kafka . Essa abordagem permite que o senhor ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas Unity Catalog ou de outro armazenamento Databricks-gerenciar.
Como faço para usar o LakeFlow Declarative pipeline sinks?
À medida que os dados do evento são ingeridos de uma fonte de transmissão para o pipeline LakeFlow Declarative, o usuário processa e refina essa funcionalidade do pipeline LakeFlow Declarative e, em seguida, usa o processamento de fluxo de acréscimo para transmitir os registros de dados transformados para um sink do pipeline LakeFlow Declarative. Você cria esse coletor usando a função create_sink()
. Para obter mais detalhes sobre a função create_sink
, consulte a referência da API do sink.
Se o senhor tem um pipeline que cria ou processa os dados do evento de transmissão e prepara os registros de dados para gravação, então está pronto para usar um sink de pipeline declarativo LakeFlow.
A implementação de um coletor de pipeline declarativo LakeFlow consiste em duas etapas:
- Crie o sink do pipeline declarativo LakeFlow.
- Use um fluxo de acréscimo para gravar os registros preparados no coletor.
Criar um coletor de pipeline declarativo LakeFlow
Databricks suporta três tipos de sumidouros de destino nos quais o usuário grava os registros processados a partir dos dados de transmissão:
- Delta pias de mesa (incluindo Unity Catalog gerenciar e mesas externas)
- Sumidouros do Apache Kafka
- Sumidouros dos Hubs de Eventos do Azure
abaixo são exemplos de configurações para Delta, Kafka e Azure Hubs de eventos:
- Delta sinks
- Kafka and Azure Event Hubs sinks
Para criar um Delta sink por caminho de arquivo:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um Delta sink por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Esse código funciona para os sinks do Apache Kafka e do Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Para obter mais detalhes sobre o uso da função create_sink
, consulte a referência da API do sink.
Após a criação do sink, o senhor pode começar a transmitir os registros processados para o sink.
Gravar em um coletor de pipeline declarativo LakeFlow com um fluxo de acréscimo
Com seu coletor criado, a próxima etapa é gravar registros processados nele especificando-o como o destino para a saída de registros por um fluxo de acréscimo. Você faz isso especificando seu coletor como o valor target
no decorador append_flow
.
- Para gerenciar Unity Catalog e tabelas externas, use o formato
delta
e especifique o caminho ou o nome da tabela nas opções. Seu pipeline declarativo LakeFlow deve ser configurado para usar Unity Catalog. - Para os tópicos do Apache Kafka , use o formato
kafka
e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que a Spark transmissão estructurada Kafka suporta para a pia. Consulte Configurar o gravador Kafka transmissão estructurada. - Para Azure Event Hubs, use o formato
kafka
e especifique o nome do Event Hubs, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um sink de Hubs de Eventos de transmissão estruturada Spark que usa a interface Kafka. Veja a autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.
abaixo são exemplos de como configurar fluxos para gravar em Delta, Kafka e Azure Event Hubs sinks com registros processados pelo pipeline declarativo LakeFlow.
- Delta sink
- Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value
é obrigatório para um coletor do Azure Event Hubs. Parâmetros adicionais, como key
, partition
, headers
e topic
, são opcionais.
Para obter mais detalhes sobre o decorador append_flow
, consulte Como usar vários fluxos para gravar em um único destino.
Limitações
-
Somente a API Python é compatível. Não há suporte para SQL.
-
Somente as consultas de transmissão são suportadas. não há suporte para consultas de lotes.
-
Somente
append_flow
pode ser usado para gravar em coletores. Outros fluxos, comocreate_auto_cdc_flow
, não são compatíveis, e o senhor não pode usar um sink em uma definição de LakeFlow Declarative pipeline dataset. Por exemplo, o seguinte não é suportado:Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Para Delta sinks, o nome da tabela deve ser totalmente qualificado. Especificamente, para Unity Catalog gerenciar tabelas externas, o nome da tabela deve ter o formato
<catalog>.<schema>.<table>
. Para o Hive metastore, ele deve estar no formato<schema>.<table>
. -
A execução de uma atualização completa do site refresh não limpa os dados de resultados de computação anteriores nos sinks. Isso significa que todos os dados reprocessados são anexados ao coletor e os dados existentes não são alterados.
-
LakeFlow Não há suporte para expectativas declarativas de pipeline.