create_sink
Visualização
A API do DLT create_sink
está em pré-visualização pública.
A função create_sink()
grava em um serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs, ou em uma tabela Delta de um DLT pipeline. Depois de criar um coletor com a função create_sink()
, você usa o coletor em um fluxo de acréscimo para gravar dados no coletor. append flow é o único tipo de fluxo suportado pela função create_sink()
. Outros tipos de fluxo, como apply_changes
, não são suportados.
O coletor Delta é compatível com as tabelas externas e gerenciar Unity Catalog e com as tabelas gerenciar Hive metastore. Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas do Unity Catalog devem usar um identificador de três camadas: <catalog>.<schema>.<table>
. Hive metastore As tabelas devem usar <schema>.<table>
.
- A execução de uma atualização completa do site refresh não limpa os dados dos sinks. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
- As expectativas de DLT não são compatíveis com a API
sink
.
Sintaxe
import dlt
dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parâmetros
Parâmetro | Tipo | Descrição |
---|---|---|
|
| Obrigatório. Uma cadeia de caracteres que identifica o coletor e é usada para fazer referência e gerenciar o coletor. Os nomes dos sumidouros devem ser exclusivos do site pipeline, inclusive em todo o código-fonte, como o Notebook ou os módulos que fazem parte do site pipeline. |
|
| Obrigatório. Uma cadeia de caracteres que define o formato de saída, |
|
| Uma lista de opções de sink, formatada como
|
Exemplos
import dlt
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)