Pular para o conteúdo principal

Utilizando sumidouros em um pipeline

info

Visualização

A API sink está em Pré-visualização Pública.

Esta página descreve a API LakeFlow Spark Declarative pipeline sink e como usá-la com fluxos para gravar registros transformados por um pipeline em um coletor de dados externo. Os destinos de dados externos incluem Unity Catalog , tabelas externas e serviços de transmissão de eventos como Apache Kafka ou Azure Event Hubs.

nota

O que são pias?

Os sumidouros são destinos para os fluxos em um pipeline. Por default, os fluxos pipeline emitem dados para uma tabela de transmissão ou para uma view materializada. Ambas são tabelas Delta Databricks . Os sinks são um destino alternativo que você usa para gravar dados transformados em destinos como serviços de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, e tabelas externas gerenciadas pelo Unity Catalog. Ao usar sinks, você agora tem mais opções para persistir a saída do seu pipeline.

Quando devo usar as pias?

A Databricks recomenda o uso de sinks se você precisar:

  • Desenvolva um caso de uso operacional, como detecção de fraudes, análise de tempo real e recomendações ao cliente. Os casos de uso operacionais normalmente leem dados de um barramento de mensagens, como um tópico do Apache Kafka, e então processam os dados com baixa latência e gravam os registros processados de volta no barramento de mensagens. Essa abordagem permite obter menor latência, evitando operações de leitura ou gravação no armazenamento cloud .
  • Grave os dados transformados dos seus fluxos em tabelas gerenciadas por uma instância Delta externa, incluindo Unity Catalog e tabelas externas.
  • Realizar a extração, transformação e carregamento reversos (ETL) em destinos externos ao Databricks, como tópicos Apache Kafka . Essa abordagem permite que você dê suporte eficaz a casos de uso em que os dados precisam ser lidos ou usados fora das tabelas Unity Catalog ou de outros armazenamentos Databricks .
  • Preciso escrever em um formato de dados que não é suportado diretamente pelo Databricks. Fontes de dados personalizadas Python permitem criar um coletor de dados que grava em qualquer fonte de dados usando código Python personalizado. Consulte Fonte de dados personalizadaPySpark.

Como usar as pias?

À medida que os dados de eventos são ingeridos de uma fonte de transmissão em seu pipeline, você processa e refina esses dados em transformações em seu pipeline. Em seguida, você usa o processamento de fluxo de anexação para transmitir os registros de dados transformados para um destino. Você cria este coletor usando a função create_sink() . Para obter mais detalhes sobre a função create_sink , consulte a referência da API de destino.

Se você possui um pipeline que cria ou processa seus dados de eventos de transmissão e prepara registros de dados para gravação, então você está pronto para usar um coletor (sink).

A implementação de uma pia consiste em duas etapas:

  1. Crie a pia.
  2. Utilize um fluxo de anexação para gravar os registros preparados no destino.

Crie uma pia

Databricks suporta vários tipos de destinos nos quais você grava os registros processados a partir dos seus dados de transmissão:

  • Destinos de tabelas Delta (incluindo gerenciamento Unity Catalog e tabelas externas)
  • Apache Kafka sinks
  • Destinos do Azure Event Hubs
  • Sumidouros personalizados escritos em Python, usando a fonte de dados personalizada Python

Abaixo estão exemplos de configurações para coletores Delta, Kafka e Azure Event Hubs, e uma fonte de dados personalizada Python :

Para criar um coletor Delta por caminho de arquivo:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Para criar um coletor Delta por nome de tabela usando um caminho de catálogo e esquema totalmente qualificado:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

Para obter mais detalhes sobre como usar a função create_sink , consulte a referência API de sink e PySpark custom fonte de dados.

Após a criação do seu ponto de coleta, você pode começar a transmitir os registros processados para ele.

Escreva em um coletor com um fluxo de acréscimo

Com o seu coletor criado, o próximo passo é gravar os registros processados nele, especificando-o como o destino para os registros emitidos por um fluxo de acréscimo. Você faz isso especificando seu sink como o valor target no decorador append_flow .

  • Para gerenciar Unity Catalog e tabelas externas, use o formato delta e especifique o caminho ou o nome da tabela nas opções. Seu pipeline deve ser configurado para usar Unity Catalog.
  • Para tópicos Apache Kafka , use o formato kafka e especifique o nome do tópico, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções que um coletor Kafka Spark suporta. Consulte Configurar o gravador de transmissão estruturada Kafka.
  • Para o Azure Event Hubs, use o formato kafka e especifique o nome do Event Hubs, as informações de conexão e as informações de autenticação nas opções. Essas são as mesmas opções suportadas em um coletor de Event Hubs estruturado Spark que usa a interface Kafka . Consulte Autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.

Abaixo estão exemplos de como configurar fluxos para gravar em coletores Delta, Kafka e Azure Event Hubs com registros processados pelo seu pipeline.

Python
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Para obter mais detalhes sobre o decorador append_flow , consulte Usando vários fluxos para escrever em um único destino.

Limitações

  • Apenas a API Python é suportada. SQL não é suportado.

  • Somente consultas de transmissão são suportadas. Consultas em lotes não são suportadas.

  • Somente append_flow pode ser usado para escrever em sinks. Outros fluxos, como create_auto_cdc_flow, não são suportados e você não pode usar um coletor em uma definição de dataset pipeline . Por exemplo, o seguinte não é suportado:

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Para sumidouros Delta, o nome da tabela deve ser totalmente qualificado. Especificamente, para o Unity Catalog gerenciar tabelas externas, o nome da tabela deve estar no formato <catalog>.<schema>.<table>. Para o Hive metastore, deve estar no formato <schema>.<table>.

  • Executar uma refresh completa não limpa os dados de resultados de cálculos anteriores nos coletores. Isso significa que quaisquer dados reprocessados são anexados ao destino, e os dados existentes não são alterados.

  • As expectativas de pipeline não são suportadas.

recurso