Use sinks para transmitir registros para serviço externo com pipeline declarativo LakeFlow
Visualização
A API do pipeline declarativo LakeFlow sink
está em visualização pública.
Este artigo descreve a API do pipeline declarativo LakeFlow sink
e como usá-la com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os coletores de dados externos incluem o gerenciador Unity Catalog e tabelas externas, além de serviços de transmissão de eventos como Apache Kafka ou Azure Event Hubs.
A API do pipeline declarativo LakeFlow sink
está disponível somente para Python.
O que são sumidouros de pipeline declarativos LakeFlow ?
Os coletores de pipeline declarativos LakeFlow são alvos para fluxos de pipeline declarativos LakeFlow . Por default os fluxos de pipeline declarativos LakeFlow emitem dados para uma tabela de transmissão ou para um destino view materializada. Ambas são tabelas Delta gerenciadas pelo Databricks . Os coletores de pipeline declarativos LakeFlow são um destino alternativo que você usa para gravar dados transformados em destinos como Apache Kafka ou Azure Event Hubs e tabelas externas gerenciadas pelo Unity Catalog. Usando coletores, agora você tem mais opções para persistir a saída do seu pipeline declarativo LakeFlow .
Quando devo usar os coletores de pipeline declarativos LakeFlow ?
Databricks recomenda o uso de coletores de pipeline declarativos LakeFlow se você precisar:
- Crie um caso de uso operacional, como detecção de fraudes, análise de tempo real e recomendações de clientes. Os casos de uso operacional normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e então processam dados com baixa latência e gravam os registros processados de volta em um barramento de mensagens. Essa abordagem permite que você alcance menor latência ao não gravar ou ler dados do armazenamento cloud .
- Grave dados transformados dos fluxos do pipeline declarativo LakeFlow em tabelas gerenciadas por uma instância Delta externa, incluindo Unity Catalog gerenciado e tabelas externas.
- Execute extração reversa, transformação, carregamento (ETL) em coletores externos ao Databricks, como tópicos Apache Kafka . Essa abordagem permite que você ofereça suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas Unity Catalog ou de outro armazenamento gerenciado Databricks .
- É necessário gravar em um formato de dados que não é suportado diretamente pelo Databricks. A fonte de dados personalizada Python permite que você crie um coletor que grava em qualquer fonte de dados usando código Python personalizado. Consulte Fonte de dados personalizadaPySpark.
Como usar os coletores de pipeline declarativos LakeFlow ?
À medida que os dados do evento são ingeridos de uma fonte de transmissão para seu pipeline LakeFlow Declarative, você processa e refina essa funcionalidade do pipeline LakeFlow Declarative e, em seguida, usa o processamento de fluxo de acréscimo para transmitir os registros de dados transformados para um coletor de pipeline LakeFlow Declarative. Você cria esse coletor usando a função create_sink()
. Para mais detalhes sobre a função create_sink
, consulte a referência da API do sink.
Se você tiver um pipeline que cria ou processa seus dados de eventos de transmissão e prepara registros de dados para gravação, estará pronto para usar um coletor de pipeline declarativo LakeFlow .
A implementação de um pipeline declarativo LakeFlow consiste em dois passos:
- Crie o coletor do pipeline declarativo LakeFlow .
- Use um fluxo de acréscimo para gravar os registros preparados no coletor.
Crie um coletor de pipeline declarativo LakeFlow
Databricks suporta vários tipos de coletores de destino nos quais você grava seus registros processados a partir dos seus dados de transmissão:
- Pias de tabela Delta (incluindo tabelas externas e gerenciais Unity Catalog )
- Apache Kafka afunda
- Coletores do Azure Event Hubs
- Pias personalizadas escritas em Python, usando a fonte de dados personalizada Python
abaixo estão exemplos de configurações para os coletores Delta, Kafka e Azure Event Hubs, e fonte de dados personalizada Python :
- Delta sinks
- Kafka and Azure Event Hubs sinks
- Python custom data sources
Para criar um coletor Delta pelo caminho do arquivo:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um coletor Delta por nome de tabela usando um catálogo totalmente qualificado e um caminho de esquema:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Este código funciona para os coletores do Apache Kafka e do Azure Event Hubs.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
credential_name
é uma referência a uma credencial de serviço do Unity Catalog. Para obter mais informações, consulte Usar credenciais de serviço Unity Catalog para se conectar ao serviço cloud externo.
Supondo que você tenha uma fonte de dados personalizada Python registrada como my_custom_datasource
, o código a seguir pode gravar nessa fonte de dados.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Para obter mais detalhes sobre o uso da função create_sink
, consulte a referência da API do sink.
Depois que seu coletor for criado, você pode começar a transmitir registros processados para ele.
Escrever em um coletor de pipeline declarativo LakeFlow com um fluxo de acréscimo
Com seu coletor criado, o próximo passo é gravar registros processados nele, especificando-o como o destino para os registros de saída de um fluxo de acréscimo. Você faz isso especificando seu coletor como o valor target
no decorador append_flow
.
- Para gerenciar Unity Catalog e tabelas externas, use o formato
delta
e especifique o caminho ou nome da tabela nas opções. Seu pipeline declarativo LakeFlow deve ser configurado para usar Unity Catalog. - Para tópicos Apache Kafka , use o formato
kafka
e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que um dissipador Spark transmissão estructurada Kafka suporta. Consulte Configurar o gravador de transmissão estruturada Kafka. - Para os Hubs de Eventos Azure , use o formato
kafka
e especifique o nome dos Hubs de Eventos, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um coletor Spark Transmissão Estruturada Event Hubs que usa a interface Kafka . Consulte Autenticação da entidade de serviço com o Microsoft Entra ID e os Hubs de Eventos do Azure.
Abaixo estão exemplos de como configurar fluxos para gravar em coletores Delta, Kafka e Azure Event Hubs com registros processados pelo seu pipeline LakeFlow Declarative.
- Delta sink
- Kafka and Azure Event Hubs sinks
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value
é obrigatório para um coletor do Azure Event Hubs. Parâmetros adicionais como key
, partition
, headers
e topic
são opcionais.
Para obter mais detalhes sobre o decorador append_flow
, consulte Usando vários fluxos para gravar em um único destino.
Limitações
-
Somente a API Python é suportada. SQL não é suportado.
-
Somente consultas de transmissão são suportadas. Consultas de lotes não são suportadas.
-
Somente
append_flow
pode ser usado para gravar em coletores. Outros fluxos, comocreate_auto_cdc_flow
, não são suportados, e você não pode usar um coletor em uma definição de dataset de pipeline declarativo LakeFlow . Por exemplo, o seguinte não é suportado:Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Para sumidouros Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para tabelas externas Unity Catalog , o nome da tabela deve estar no formato
<catalog>.<schema>.<table>
. Para o Hive metastore, ele deve estar no formato<schema>.<table>
. -
Executar uma refresh completa não limpa os dados de resultados computados anteriormente nos coletores. Isso significa que todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
-
As expectativas de pipeline declarativas LakeFlow não são suportadas.