criar_pia
Visualização
A API do pipeline declarativo LakeFlow create_sink está em visualização pública.
A função create_sink() grava em um serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, ou em uma tabela Delta de um pipeline declarativo. Depois de criar um coletor com a função create_sink() , você usa o coletor em um fluxo de acréscimo para gravar dados no coletor. append flow é o único tipo de fluxo suportado pela função create_sink() . Outros tipos de fluxo, como create_auto_cdc_flow, não são suportados.
O coletor Delta oferece suporte a tabelas externas e de gerenciamento Unity Catalog e tabelas de gerenciamento Hive metastore . Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas Unity Catalog devem usar um identificador de três camadas: <catalog>.<schema>.<table>. As tabelas Hive metastore devem usar <schema>.<table>.
- Executar uma refresh completa não limpa os dados dos coletores. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
- As expectativas de pipeline declarativas LakeFlow não são suportadas com a API sink.
Sintaxe
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parâmetros
| Parâmetro | Tipo | Descrição | 
|---|---|---|
| 
 | 
 | Obrigatório. Uma string que identifica o coletor e é usada para referenciar e gerenciar o coletor. Os nomes dos coletores devem ser exclusivos para o pipeline, inclusive em todos os arquivos de código-fonte que fazem parte do pipeline. | 
| 
 | 
 | Obrigatório. Uma string que define o formato de saída,  | 
| 
 | 
 | Uma lista de opções de coletor, formatadas como  
 | 
Exemplos
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)
# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)