Utilize destinos em pipelines
Use a API sink do Lakeflow Spark Declarative Pipelines com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os destinos de dados externos incluem tabelas gerenciadas e externas do Unity Catalog, e serviços de transmissão de eventos como Apache Kafka ou Azure Event Hubs. Você também pode usar coletores de dados para gravar em fontes de dados personalizadas escrevendo código Python para essa fonte de dados.
Para uma visão geral dos conceitos de destino e quando usá-los, consulte Destinos em Lakeflow Spark Declarative Pipelines.
- A API
sinkestá disponível apenas para Python. - Você pode criar um coletor personalizado com a API ForEachBatch. Consulte Usar ForEachBatch para gravar em destinos de dados arbitrários no pipeline.
Sink fluxo de trabalho
À medida que os dados de eventos são ingeridos de uma fonte de transmissão em seu pipeline, você processa e refina esses dados em transformações em seu pipeline. Em seguida, você usa o processamento de fluxo de anexação para transmitir os registros de dados transformados para um destino. Você cria este coletor usando a função create_sink() . Para obter mais detalhes sobre a função create_sink , consulte a referência da API de destino.
Se você possui um pipeline que cria ou processa seus dados de eventos de transmissão e prepara registros de dados para gravação, então você está pronto para usar um coletor (sink).
A implementação de uma pia consiste em duas etapas:
- Crie a pia.
- Utilize um fluxo de acréscimo ou um fluxo de atualização para gravar os registros preparados no destino.
Crie uma pia
Databricks suporta vários tipos de destinos nos quais você grava os registros processados a partir dos seus dados de transmissão:
- Destinos de tabelas Delta (incluindo gerenciamento Unity Catalog e tabelas externas)
- Apache Kafka sinks
- Destinos do Azure Event Hubs
- Sumidouros personalizados escritos em Python, usando a fonte de dados personalizada Python
Abaixo estão exemplos de configurações para coletores Delta, Kafka e Azure Event Hubs, e uma fonte de dados personalizada Python :
- Delta sinks
- Kafka and Azure Event Hubs sinks
- Python custom data sources
Para criar um coletor Delta por caminho de arquivo:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Para criar um coletor Delta por nome de tabela usando um caminho de catálogo e esquema totalmente qualificado:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Este código funciona tanto para coletores Apache Kafka quanto para coletores Azure Event Hubs.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
O credential_name é uma referência a uma credencial de serviço do Unity Catalog. Para obter mais informações, consulte Usar credenciais de serviço Unity Catalog para conectar-se ao serviço cloud externo.
Supondo que você tenha uma fonte de dados personalizada Python registrada como my_custom_datasource, então o código a seguir pode escrever nessa fonte de dados.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create Lakeflow SDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Para obter detalhes sobre como criar fonte de dados personalizada em Python, consulte Fonte de dados personalizadaPySpark.
Para obter mais detalhes sobre como usar a função create_sink , consulte a referência da API de destino.
Após a criação do seu ponto de coleta, você pode começar a transmitir os registros processados para ele.
Escreva em um coletor com um fluxo de acréscimo
Com o seu coletor criado, o próximo passo é gravar os registros processados nele, especificando-o como o destino para os registros emitidos por um fluxo de acréscimo. Você faz isso especificando seu sink como o valor target no decorador append_flow .
- Para gerenciar Unity Catalog e tabelas externas, use o formato
deltae especifique o caminho ou o nome da tabela nas opções. Seu pipeline deve ser configurado para usar Unity Catalog. - Para tópicos Apache Kafka , use o formato
kafkae especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que um coletor Kafka Spark suporta. Consulte Configurar o gravador de transmissão estruturada Kafka. - Para o Azure Event Hubs, use o formato
kafkae especifique o nome do Event Hubs, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um coletor de Event Hubs estruturado Spark que usa a interface Kafka . Consulte Autenticação.
Abaixo estão exemplos de como configurar fluxos para gravar em coletores Delta, Kafka e Azure Event Hubs com registros processados pelo seu pipeline.
- Delta sink
- Kafka and Azure Event Hubs sinks
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
O parâmetro value é obrigatório para um coletor do Azure Event Hubs. Parâmetros adicionais como key, partition, headers e topic são opcionais.
Para obter mais detalhes sobre o decorador append_flow , consulte default flows e append flows.
Limitações
-
Apenas a API Python é suportada. SQL não é suportado.
-
Somente consultas de transmissão são suportadas. Consultas em lotes não são suportadas.
-
Somente
append_floweupdate_flowpodem ser usados para gravar em coletores. Outros fluxos, comocreate_auto_cdc_flow, não são compatíveis, e não é possível usar um destino em uma definição de dataset de pipeline. Por exemplo, o seguinte não é aceito:Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Para sumidouros Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para o Unity Catalog gerenciar tabelas externas, o nome da tabela deve estar no formato
<catalog>.<schema>.<table>. Para o Hive metastore, deve estar no formato<schema>.<table>. -
A execução de uma atualização de refresh completa não limpa os dados de resultados de compute computados anteriormente nos sinks. Isso significa que todos os dados reprocessados são anexados ao coletor e os dados existentes não são alterados.
-
As expectativas de pipeline não são suportadas.
-
O controle de saída sem servidor suporta apenas conectores de destino Kafka e Delta Lake . Veja O que é controle de saída serverless ?