Referência da linguagem DLT Python
Este artigo contém detalhes sobre a interface de programação do DLT Python.
Para obter informações sobre o site SQL API, consulte a referência de linguagem DLT SQL.
Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?
Antes de começar
A seguir, há considerações importantes quando o senhor implementa o pipeline com a interface DLT Python:
- Como as funções Python
table()
eview()
são invocadas várias vezes durante o planejamento e a execução de uma atualização pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifique dados ou envie um email). Para evitar um comportamento inesperado, suas funções Python que definem o conjunto de dados devem incluir apenas o código necessário para definir a tabela ou view. - Para realizar operações como o envio de e-mail ou a integração com um serviço de monitoramento externo, especialmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seu conjunto de dados causará um comportamento inesperado.
- O Python
table
eview
as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções comocollect()``count()
,toPandas()
,save()
esaveAsTable()
. Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados.
Importar o módulo dlt
Módulo Python
As funções DLT Python são definidas no módulo dlt
. Seu pipeline implementado com o Python API deve importar esse módulo:
import dlt
Criar uma tabela materializada DLT view ou de transmissão
Em Python, a DLT determina se deve atualizar uma dataset como uma view materializada ou uma tabela de transmissão com base na consulta de definição. O decorador @table
pode ser usado para definir tanto a exibição materializada quanto as tabelas de transmissão.
Para definir um view materializado em Python, aplique @table
a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de transmissão, aplique @table
a uma consulta que execute uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table(). Os dois tipos de dataset têm a mesma especificação de sintaxe, como segue:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Criar uma DLT view
Para definir um view em Python, aplique o decorador @view
. Assim como o decorador @table
, o senhor pode usar a visualização no DLT para conjuntos de dados estáticos ou de transmissão. A seguir, a sintaxe para definir a visualização com Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exemplo: Definir tabelas e visualizações
Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view
ou @dlt.table
a uma função. Você pode utilizar o nome da função ou o parâmetro name
para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw
que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data
que usa a taxi_raw
exibição como entrada:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Exemplo: Acessar um dataset definido no mesmo pipeline
Embora as funções dlt.read()
e dlt.read_stream()
ainda estejam disponíveis e sejam totalmente compatíveis com a interface DLT Python, a Databricks recomenda sempre usar as funções spark.read.table()
e spark.readStream.table()
devido ao seguinte:
- As funções
spark
suportam a leitura de conjuntos de dados internos e externos, inclusive conjuntos de dados em armazenamento externo ou definidos em outro pipeline. As funçõesdlt
suportam apenas a leitura de conjuntos de dados internos. - As funções
spark
suportam a especificação de opções, comoskipChangeCommits
, para operações de leitura. A especificação de opções não é suportada pelas funçõesdlt
.
Para acessar um dataset definido no mesmo pipeline, use as funções spark.read.table()
ou spark.readStream.table()
:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Ao consultar a visualização ou as tabelas no site pipeline, o senhor pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado no site pipeline. Neste exemplo, a tabela customers
é gravada e lida no catálogo default e no esquema configurado para o seu pipeline.
Exemplo: Leia a partir de uma tabela registrada em um metastore
Para ler dados de uma tabela registrada no site Hive metastore, no argumento da função, o senhor pode qualificar o nome da tabela com o nome do banco de dados:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Para obter um exemplo de leitura de uma tabela do Unity Catalog, consulte Ingerir dados em um pipeline do Unity Catalog.
Exemplo: Acesse o site dataset usando spark.sql
O senhor também pode retornar um dataset usando uma expressão spark.sql
em uma função de consulta. Para ler a partir de um dataset interno, o senhor pode deixar o nome sem qualificação para usar o catálogo e o esquema do default ou pode acrescentá-los previamente:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Excluir permanentemente registros de uma tabela materializada view ou de transmissão
Para excluir permanentemente registros de uma tabela materializada view ou de transmissão com vetores de exclusão ativados, como para GDPR compliance, operações adicionais devem ser realizadas nas tabelas subjacentes Delta do objeto. Para garantir a exclusão de registros de um site materializado view, consulte Excluir permanentemente registros de um site materializado view com vetores de exclusão ativados. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir permanentemente registros de uma tabela de transmissão.
Gravar em tabelas de serviço de transmissão de eventos externos ou Delta com o DLT sink
. API
Visualização
A API DLT sink
está em pré-visualização pública.
- A execução de uma atualização completa do site refresh não limpa os dados dos sinks. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
- As expectativas de DLT não são compatíveis com a API
sink
.
Para gravar em um serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs ou em uma tabela Delta de um DLT pipeline, use a função create_sink()
incluída no módulo dlt
Python. Depois de criar um coletor com a função create_sink()
, você usa o coletor em um fluxo de acréscimo para gravar dados no coletor. append flow é o único tipo de fluxo suportado pela função create_sink()
. Outros tipos de fluxo, como apply_changes
, não são suportados.
A seguir está a sintaxe para criar um coletor com a função create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Argumentos |
---|
|
|
|
Exemplo: Criar um sink do Kafka com a função create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Exemplo: Criar um sumidouro Delta com a função create_sink()
e um caminho de sistema de arquivos
O exemplo a seguir cria um sink que grava em uma tabela Delta, passando o caminho do sistema de arquivos para a tabela:
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Exemplo: Criar um sumidouro Delta com a função create_sink()
e um nome de tabela do Unity Catalog
O coletor Delta é compatível com as tabelas externas e gerenciar Unity Catalog e com as tabelas gerenciar Hive metastore. Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas do Unity Catalog devem usar um identificador de três camadas: <catalog>.<schema>.<table>
. Hive metastore as tabelas devem usar <schema>.<table>
.
O exemplo a seguir cria um sink que grava em uma tabela Delta passando o nome de uma tabela no Unity Catalog:
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Exemplo: Usar um fluxo de acréscimo para gravar em um sumidouro Delta
O exemplo a seguir cria um coletor que grava em uma tabela Delta e, em seguida, cria um fluxo de acréscimo para gravar nesse coletor:
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Exemplo: Usar um fluxo de acréscimo para gravar em um sink do Kafka
O exemplo a seguir cria um sink que grava em um tópico do Kafka e, em seguida, cria um fluxo de append para gravar nesse sink:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
O esquema do DataFrame gravado em Kafka deve incluir as colunas especificadas em Configure the Kafka transmissão estructurada writer.
Criar uma tabela para ser usada como destino das operações de transmissão
Use a função create_streaming_table()
para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes(), apply_changes_from_snapshot() e @append_flow.
As funções create_target_table()
e create_streaming_live_table()
estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table()
função.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumentos |
---|
|
|
|
|
|
|
|
|
|
|
Controle como as tabelas são materializadas
As tabelas também oferecem controle adicional de sua materialização:
- Especifique como fazer o clustering de tabelas usando
cluster_by
. O senhor pode usar o clustering líquido para acelerar as consultas. Consulte Usar clustering líquido para tabelas Delta. - Especifique como as tabelas são particionadas usando
partition_cols
. - O senhor pode definir as propriedades da tabela ao definir um view ou uma tabela. Consulte as propriedades da tabela DLT.
- Defina um local de armazenamento para os dados da tabela usando a configuração
path
. Por padrão, os dados da tabela são armazenados no local de armazenamento do pipeline sepath
não estiver definido. - Você pode usar colunas geradas em sua definição de esquema. Consulte Exemplo: Especificar um esquema e colunas de clustering.
Para tabelas com menos de 1 TB de tamanho, a Databricks recomenda deixar a DLT controlar a organização dos dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.
Exemplo: Especificar um esquema e colunas de clustering
Opcionalmente, o senhor pode especificar um esquema de tabela usando uma Python StructType
ou uma SQL DDL strings. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.
O exemplo a seguir cria uma tabela chamada sales
com um esquema especificado usando Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
O exemplo a seguir especifica o esquema de uma tabela usando strings DDL, define uma coluna gerada e define colunas clustering:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Por default, a DLT infere o esquema a partir da definição table
se o usuário não especificar um esquema.
Exemplo: especificar colunas de partição
O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Exemplo: Definir restrições de tabela
Visualização
As restrições de tabela estão na Pré-visualização pública.
Ao especificar um esquema, o senhor pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.
O exemplo a seguir define uma tabela com uma restrição primária e estrangeira key:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exemplo: definir um filtro de linha e uma máscara de coluna
Visualização
Os filtros de linha e as máscaras de coluna estão na Pré-visualização pública.
Para criar uma tabela materializada view ou de transmissão com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir uma tabela materializada view e uma tabela de transmissão com um filtro de linha e uma máscara de coluna:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.
Configurar uma tabela de transmissão para ignorar as alterações em uma tabela de transmissão de origem
- O sinalizador
skipChangeCommits
funciona apenas comspark.readStream
usando a funçãooption()
. Você não pode usar este sinalizador em uma funçãodlt.read_stream()
. - O senhor não pode usar o sinalizador
skipChangeCommits
quando a tabela de transmissão de origem é definida como o destino de uma função apply_changes().
Em default, as tabelas de transmissão exigem fontes somente de anexos. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, GDPR processamento do "direito de ser esquecido", o sinalizador skipChangeCommits
pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Propriedades do Python DLT
As tabelas a seguir descrevem as opções e propriedades que o senhor pode especificar ao definir tabelas e visualizações com o DLT:
@tabela ou @visualizar |
---|
|
|
|
|
|
|
|
|
|
|
Definição de tabela ou exibição |
---|
|
|
Expectativas |
---|
|
|
|
|
|
|
captura de dados de alterações (CDC) de um feed de alterações com Python em DLT
Use a função apply_changes()
no site Python API para usar a funcionalidade DLT de captura de dados de alterações (CDC) (CDC) para processar dados de origem de um feed de dados de alterações (CDF).
O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes()
, você deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados dos campos sequence_by
.
Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface DLT Python.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Para o processamento de APPLY CHANGES
, o comportamento do default para eventos INSERT
e UPDATE
é fazer upsert de eventos CDC da origem: atualizar todas as linhas da tabela de destino que correspondam ao(s) key(s) especificado(s) ou inserir uma nova linha quando não houver um registro correspondente na tabela de destino. O tratamento de eventos DELETE
pode ser especificado com a condição APPLY AS DELETE WHEN
.
Para saber mais sobre o processamento de CDC com um feed de alterações, consulte APLICAR ALTERAÇÕES APIs: Simplificar a captura de dados de alterações (CDC) com DLT. Para ver um exemplo de uso da função apply_changes()
, consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.
O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes
, você deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados do campo sequence_by
.
Consulte o site APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.
Argumentos |
---|
|
|
|
|
|
|
|
|
|
|
captura de dados de alterações (CDC) do banco de dados Snapshot com Python em DLT
Visualização
A API APPLY CHANGES FROM SNAPSHOT
está em pré-visualização pública.
Use a função apply_changes_from_snapshot()
no site Python API para usar a funcionalidade DLT de captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot.
Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes_from_snapshot()
, você também deve incluir as colunas __START_AT
e __END_AT
com o mesmo tipo de dados que o campo sequence_by
.
Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface DLT Python.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Para o processamento APPLY CHANGES FROM SNAPSHOT
, o comportamento do default é inserir uma nova linha quando não houver um registro correspondente com o mesmo key(s) no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.
Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplificar a captura de dados de alterações (CDC) com DLT. Para obter exemplos de uso da função apply_changes_from_snapshot()
, consulte os exemplos de ingestão periódica de Snapshot e ingestão histórica de Snapshot.
Argumentos |
---|
|
|
|
|
|
Implemente o argumento source
A função apply_changes_from_snapshot()
inclui o argumento source
. Para processar o Snapshot histórico, espera-se que o argumento source
seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot()
: um Python DataFrame contendo os dados do Snapshot a serem processados e uma versão do Snapshot.
A seguir está a assinatura da função lambda:
lambda Any => Optional[(DataFrame, Any)]
- O argumento para a função lambda é a versão do Snapshot processada mais recentemente.
- O valor de retorno da função lambda é
None
ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.
Um exemplo que implementa e chama a função lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
O tempo de execução da DLT executa as seguintes etapas sempre que o pipeline que contém a função apply_changes_from_snapshot()
é acionado:
- executar a função
next_snapshot_and_version
para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot. - Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
- Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
- Retorna à etapa 1 para carregar o próximo Snapshot e sua versão.
Limitações
A interface DLT Python tem a seguinte limitação:
A função pivot()
não é suportada. As pivot
operações em Spark exigem o carregamento ávido de dados de entrada para compute o esquema de saída. Esse recurso não é suportado no DLT.