Pular para o conteúdo principal

Referência da linguagem DLT Python

Este artigo contém detalhes sobre a interface de programação do DLT Python.

Para obter informações sobre o site SQL API, consulte a referência de linguagem DLT SQL.

Para obter detalhes específicos sobre a configuração do Auto Loader, consulte O que é o Auto Loader?

Antes de começar

A seguir, há considerações importantes quando o senhor implementa o pipeline com a interface DLT Python:

  • Como as funções Python table() e view() são invocadas várias vezes durante o planejamento e a execução de uma atualização pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifique dados ou envie um email). Para evitar um comportamento inesperado, suas funções Python que definem o conjunto de dados devem incluir apenas o código necessário para definir a tabela ou view.
  • Para realizar operações como o envio de e-mail ou a integração com um serviço de monitoramento externo, especialmente em funções que definem conjuntos de dados, use ganchos de eventos. A implementação dessas operações nas funções que definem seu conjunto de dados causará um comportamento inesperado.
  • O Python table e view as funções devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções como collect()``count(), toPandas(), save() e saveAsTable(). Como as transformações do DataFrame são executadas após a resolução do gráfico de fluxo de dados completo, o uso dessas operações pode ter efeitos colaterais indesejados.

Importar o módulo dlt Módulo Python

As funções DLT Python são definidas no módulo dlt. Seu pipeline implementado com o Python API deve importar esse módulo:

Python
import dlt

Criar uma tabela materializada DLT view ou de transmissão

Em Python, a DLT determina se deve atualizar uma dataset como uma view materializada ou uma tabela de transmissão com base na consulta de definição. O decorador @table pode ser usado para definir tanto a exibição materializada quanto as tabelas de transmissão.

Para definir um view materializado em Python, aplique @table a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de transmissão, aplique @table a uma consulta que execute uma leitura de transmissão em uma fonte de dados ou use a função create_streaming_table(). Os dois tipos de dataset têm a mesma especificação de sintaxe, como segue:

Python
import dlt

@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)

Criar uma DLT view

Para definir um view em Python, aplique o decorador @view. Assim como o decorador @table, o senhor pode usar a visualização no DLT para conjuntos de dados estáticos ou de transmissão. A seguir, a sintaxe para definir a visualização com Python:

Python
import dlt

@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)

Exemplo: Definir tabelas e visualizações

Para definir uma tabela ou visualização em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode utilizar o nome da função ou o parâmetro name para atribuir a tabela ou visualizar o nome. O exemplo a seguir define dois datasets diferentes: uma visualização chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a taxi_raw exibição como entrada:

Python
import dlt

@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)

Exemplo: Acessar um dataset definido no mesmo pipeline

nota

Embora as funções dlt.read() e dlt.read_stream() ainda estejam disponíveis e sejam totalmente compatíveis com a interface DLT Python, a Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() devido ao seguinte:

  • As funções spark suportam a leitura de conjuntos de dados internos e externos, inclusive conjuntos de dados em armazenamento externo ou definidos em outro pipeline. As funções dlt suportam apenas a leitura de conjuntos de dados internos.
  • As funções spark suportam a especificação de opções, como skipChangeCommits, para operações de leitura. A especificação de opções não é suportada pelas funções dlt.

Para acessar um dataset definido no mesmo pipeline, use as funções spark.read.table() ou spark.readStream.table():

Python
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
nota

Ao consultar a visualização ou as tabelas no site pipeline, o senhor pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado no site pipeline. Neste exemplo, a tabela customersé gravada e lida no catálogo default e no esquema configurado para o seu pipeline.

Exemplo: Leia a partir de uma tabela registrada em um metastore

Para ler dados de uma tabela registrada no site Hive metastore, no argumento da função, o senhor pode qualificar o nome da tabela com o nome do banco de dados:

Python
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela do Unity Catalog, consulte Ingerir dados em um pipeline do Unity Catalog.

Exemplo: Acesse o site dataset usando spark.sql

O senhor também pode retornar um dataset usando uma expressão spark.sql em uma função de consulta. Para ler a partir de um dataset interno, o senhor pode deixar o nome sem qualificação para usar o catálogo e o esquema do default ou pode acrescentá-los previamente:

Python
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Excluir permanentemente registros de uma tabela materializada view ou de transmissão

Para excluir permanentemente registros de uma tabela materializada view ou de transmissão com vetores de exclusão ativados, como para GDPR compliance, operações adicionais devem ser realizadas nas tabelas subjacentes Delta do objeto. Para garantir a exclusão de registros de um site materializado view, consulte Excluir permanentemente registros de um site materializado view com vetores de exclusão ativados. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir permanentemente registros de uma tabela de transmissão.

Gravar em tabelas de serviço de transmissão de eventos externos ou Delta com o DLT sink. API

info

Visualização

A API DLT sink está em pré-visualização pública.

nota
  • A execução de uma atualização completa do site refresh não limpa os dados dos sinks. Todos os dados reprocessados serão anexados ao coletor e os dados existentes não serão alterados.
  • As expectativas de DLT não são compatíveis com a API sink.

Para gravar em um serviço de transmissão de eventos, como Apache Kafka ou Azure Event Hubs ou em uma tabela Delta de um DLT pipeline, use a função create_sink() incluída no módulo dlt Python. Depois de criar um coletor com a função create_sink(), você usa o coletor em um fluxo de acréscimo para gravar dados no coletor. append flow é o único tipo de fluxo suportado pela função create_sink(). Outros tipos de fluxo, como apply_changes, não são suportados.

A seguir está a sintaxe para criar um coletor com a função create_sink():

Python
create_sink(<sink_name>, <format>, <options>)

Argumentos

name Tipo: str Uma cadeia de caracteres que identifica o coletor e é usada para fazer referência e gerenciar o coletor. Os nomes dos sumidouros devem ser exclusivos do site pipeline, inclusive em todo o código-fonte, como o Notebook ou os módulos que fazem parte do site pipeline. Este parâmetro é necessário.

format Tipo: str Uma cadeia de caracteres que define o formato de saída, seja kafka ou delta. Este parâmetro é necessário.

options Tipo: dict Uma lista opcional de opções de sink, formatada como {"key": "value"}, em que key e o valor são ambos strings. Todas as opções do site Databricks Runtime compatíveis com os dissipadores Kafka e Delta são compatíveis. Para obter as opções do site Kafka, consulte Configurar o gravador de transmissão estruturada Kafka. Para opções Delta, consulte Mesa Delta como uma pia.

Exemplo: Criar um sink do Kafka com a função create_sink()

Python
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

Exemplo: Criar um sumidouro Delta com a função create_sink() e um caminho de sistema de arquivos

O exemplo a seguir cria um sink que grava em uma tabela Delta, passando o caminho do sistema de arquivos para a tabela:

Python
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)

Exemplo: Criar um sumidouro Delta com a função create_sink() e um nome de tabela do Unity Catalog

nota

O coletor Delta é compatível com as tabelas externas e gerenciar Unity Catalog e com as tabelas gerenciar Hive metastore. Os nomes das tabelas devem ser totalmente qualificados. Por exemplo, as tabelas do Unity Catalog devem usar um identificador de três camadas: <catalog>.<schema>.<table>. Hive metastore as tabelas devem usar <schema>.<table>.

O exemplo a seguir cria um sink que grava em uma tabela Delta passando o nome de uma tabela no Unity Catalog:

Python
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)

Exemplo: Usar um fluxo de acréscimo para gravar em um sumidouro Delta

O exemplo a seguir cria um coletor que grava em uma tabela Delta e, em seguida, cria um fluxo de acréscimo para gravar nesse coletor:

Python
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

Exemplo: Usar um fluxo de acréscimo para gravar em um sink do Kafka

O exemplo a seguir cria um sink que grava em um tópico do Kafka e, em seguida, cria um fluxo de append para gravar nesse sink:

Python
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

O esquema do DataFrame gravado em Kafka deve incluir as colunas especificadas em Configure the Kafka transmissão estructurada writer.

Criar uma tabela para ser usada como destino das operações de transmissão

Use a função create_streaming_table() para criar uma tabela de destino para os registros de saída das operações de transmissão, inclusive os registros de saída apply_changes(), apply_changes_from_snapshot() e @append_flow.

nota

As funções create_target_table() e create_streaming_live_table() estão obsoletas. A Databricks recomenda atualizar o código existente para usar a create_streaming_table() função.

Python
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)

Argumentos

name Tipo: str O nome da tabela. Este parâmetro é necessário.

comment Tipo: str Uma descrição opcional para a tabela.

spark_conf Tipo: dict Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties Tipo: dict Uma lista opcional de propriedades da tabela.

partition_cols Tipo: array Uma lista opcional de uma ou mais colunas para usar no particionamento da tabela.

cluster_by Tipo: array Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering. Consulte Usar clustering líquido para tabelas Delta.

path Tipo: str Um local de armazenamento opcional para dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline.

schema Tipo: str ou StructType Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com um Python StructType.

expect_all expect_all_or_drop expect_all_or_fail Tipo: dict Restrições opcionais de qualidade de dados para a tabela. Veja várias expectativas.

row_filter (Pré-visualização pública) Tipo: str Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Controle como as tabelas são materializadas

As tabelas também oferecem controle adicional de sua materialização:

nota

Para tabelas com menos de 1 TB de tamanho, a Databricks recomenda deixar a DLT controlar a organização dos dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.

Exemplo: Especificar um esquema e colunas de clustering

Opcionalmente, o senhor pode especificar um esquema de tabela usando uma Python StructType ou uma SQL DDL strings. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

O exemplo a seguir cria uma tabela chamada sales com um esquema especificado usando Python StructType:

Python
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)

@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")

O exemplo a seguir especifica o esquema de uma tabela usando strings DDL, define uma coluna gerada e define colunas clustering:

Python
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")

Por default, a DLT infere o esquema a partir da definição table se o usuário não especificar um esquema.

Exemplo: especificar colunas de partição

O exemplo a seguir especifica o esquema para uma tabela usando uma string DDL, define uma coluna gerada e define uma coluna de partição:

Python
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")

Exemplo: Definir restrições de tabela

info

Visualização

As restrições de tabela estão na Pré-visualização pública.

Ao especificar um esquema, o senhor pode definir chaves primárias e estrangeiras. As restrições são informativas e não são aplicadas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição primária e estrangeira key:

Python
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")

Exemplo: definir um filtro de linha e uma máscara de coluna

info

Visualização

Os filtros de linha e as máscaras de coluna estão na Pré-visualização pública.

Para criar uma tabela materializada view ou de transmissão com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir uma tabela materializada view e uma tabela de transmissão com um filtro de linha e uma máscara de coluna:

Python
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Configurar uma tabela de transmissão para ignorar as alterações em uma tabela de transmissão de origem

nota
  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option() . Você não pode usar este sinalizador em uma função dlt.read_stream() .
  • O senhor não pode usar o sinalizador skipChangeCommits quando a tabela de transmissão de origem é definida como o destino de uma função apply_changes().

Em default, as tabelas de transmissão exigem fontes somente de anexos. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, GDPR processamento do "direito de ser esquecido", o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

Python
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Propriedades do Python DLT

As tabelas a seguir descrevem as opções e propriedades que o senhor pode especificar ao definir tabelas e visualizações com o DLT:

@tabela ou @visualizar

name Tipo: str Um nome opcional para a tabela ou visualização. Se não estiver definida, o nome da função será usado como a tabela ou o nome de visualização.

comment Tipo: str Uma descrição opcional para a tabela.

spark_conf Tipo: dict Uma lista opcional de configurações do Spark para a execução desta consulta.

table_properties Tipo: dict Uma lista opcional de propriedades da tabela.

path Tipo: str Um local de armazenamento opcional para dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento pipeline.

partition_cols Tipo: a collection of str Uma coleção opcional, por exemplo, list de uma ou mais colunas para usar para particionar a tabela.

cluster_by Tipo: array Opcionalmente, ative o clustering líquido na tabela e defina as colunas a serem usadas como chave clustering. Consulte Usar clustering líquido para tabelas Delta.

schema Tipo: str ou StructType Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como SQL DDL strings ou com Python StructType.

temporary Tipo: bool Crie uma tabela, mas não publique metadados para a tabela. A palavra-chave temporary instrui a DLT a criar uma tabela que está disponível para o pipeline, mas que não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas em uma única atualização. O padrão é 'falso'.

row_filter (Pré-visualização pública) Tipo: str Uma cláusula opcional de filtro de linha para a tabela. Consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Definição de tabela ou exibição

def <function-name>() Uma função do Python que define o dataset. Se o parâmetro name não estiver configurado, então <function-name> será utilizado como o nome do dataset de destino.

query Uma declaração Spark SQL que retorna um dataset Spark ou DataFrame Koalas. Use dlt.read() ou spark.read.table() para realizar uma leitura completa de um dataset definido no mesmo pipeline. Para ler um site externo dataset, use a função spark.read.table(). O senhor não pode usar dlt.read() para ler um conjunto de dados externo. Como spark.read.table() pode ser usado para ler um conjunto de dados interno, um conjunto de dados definido fora do site pipeline atual, e permite que o usuário especifique opções de leitura de dados, o site Databricks recomenda usá-lo em vez da função dlt.read(). Quando o senhor define um dataset em um pipeline, por default ele usará o catálogo e o esquema definidos na configuração pipeline. O senhor pode usar a função spark.read.table() para ler a partir de um dataset definido no pipeline sem nenhuma qualificação. Por exemplo, para ler em um dataset chamado customers: spark.read.table("customers") Você também pode usar a função spark.read.table() para ler uma tabela registrada no metastore qualificando opcionalmente o nome da tabela com o nome do banco de dados: spark.read.table("sales.customers") Use dlt.read_stream() ou spark.readStream.table() para realizar uma leitura de transmissão de um dataset definido no mesmo pipeline. Para realizar uma leitura de transmissão de um site externo dataset, use o spark.readStream.table() função. Como spark.readStream.table() pode ser usado para ler um conjunto de dados interno, um conjunto de dados definido fora do site pipeline atual, e permite que o usuário especifique opções de leitura de dados, o site Databricks recomenda usá-lo em vez da função dlt.read_stream(). Para definir uma consulta em uma função DLT table usando a sintaxe SQL, use a função spark.sql. Veja o exemplo: Acesse o site dataset usando spark.sql. Para definir uma consulta em uma função DLT table usando Python, use a sintaxe do PySpark.

Expectativas

@expect("description", "constraint") Declare uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, inclua a linha no destino dataset.

@expect_or_drop("description", "constraint") Declare uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, elimine a linha do destino dataset.

@expect_or_fail("description", "constraint") Declare uma restrição de qualidade de dados identificada por description. Se uma linha violar a expectativa, interrompa imediatamente a execução.

@expect_all(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que key é a descrição da expectativa e o valor é a restrição da expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no destino dataset.

@expect_all_or_drop(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que key é a descrição da expectativa e o valor é a restrição da expectativa. Se uma linha violar qualquer uma das expectativas, elimine a linha do destino dataset.

@expect_all_or_fail(expectations) Declare uma ou mais restrições de qualidade de dados. expectations é um dicionário Python, em que key é a descrição da expectativa e o valor é a restrição da expectativa. Se uma linha violar alguma das expectativas, interrompa imediatamente a execução.

captura de dados de alterações (CDC) de um feed de alterações com Python em DLT

Use a função apply_changes() no site Python API para usar a funcionalidade DLT de captura de dados de alterações (CDC) (CDC) para processar dados de origem de um feed de dados de alterações (CDF).

important

O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes(), você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados dos campos sequence_by.

Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface DLT Python.

Python
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
nota

Para o processamento de APPLY CHANGES, o comportamento do default para eventos INSERT e UPDATE é fazer upsert de eventos CDC da origem: atualizar todas as linhas da tabela de destino que correspondam ao(s) key(s) especificado(s) ou inserir uma nova linha quando não houver um registro correspondente na tabela de destino. O tratamento de eventos DELETE pode ser especificado com a condição APPLY AS DELETE WHEN.

Para saber mais sobre o processamento de CDC com um feed de alterações, consulte APLICAR ALTERAÇÕES APIs: Simplificar a captura de dados de alterações (CDC) com DLT. Para ver um exemplo de uso da função apply_changes(), consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

important

O senhor deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes, você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados do campo sequence_by.

Consulte o site APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.

Argumentos

target Tipo: str O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função apply_changes(). Este parâmetro é necessário.

source Tipo: str A fonte de dados que contém os registros do CDC. Este parâmetro é necessário.

keys Tipo: list A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Você pode especificar: - Uma lista de strings: ["userId", "orderId"] - Uma lista de funções Spark SQL col(): [col("userId"), col("orderId"] Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). Este parâmetro é necessário.

sequence_by Tipo: str ou col() O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. O DLT usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem. Você pode especificar: - Uma corda: "sequenceNum" - Uma função Spark SQL col(): col("sequenceNum") Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). A coluna especificada deve ser um tipo de dados classificável. Este parâmetro é necessário.

ignore_null_updates Tipo: bool Permitir a ingestão de atualizações que contenham um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos pelos valores null. Este parâmetro é opcional. O padrão é False.

apply_as_deletes Tipo: str ou expr() Especifica quando um evento CDC deve ser tratado como DELETE em vez de upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma lápide na tabela subjacente Delta e é criado um view no metastore que filtra essas lápides. O intervalo de retenção pode ser configurado com o pipelines.cdc.tombstoneGCThresholdInSeconds propriedade da tabela. Você pode especificar: - Uma corda: "Operation = 'DELETE'" - Uma função Spark SQL expr(): expr("Operation = 'DELETE'") Este parâmetro é opcional.

apply_as_truncates Tipo: str ou expr() Especifica quando um evento de CDC deve ser tratado como uma tabela completa TRUNCATE. Como esta cláusula aciona uma operação completa de truncamento da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade. O parâmetro apply_as_truncates é compatível apenas com o SCD tipo 1. O SCD tipo 2 não oferece suporte a operações de truncamento. Você pode especificar: - Uma corda: "Operation = 'TRUNCATE'" - Uma função Spark SQL expr(): expr("Operation = 'TRUNCATE'") Este parâmetro é opcional.

column_list except_column_list Tipo: list Um subconjunto de colunas a incluir na tabela de destino. Use column_list para especificar a lista completa de colunas a incluir. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar o valor como uma lista de strings ou como funções Spark SQL col(): - column_list = ["userId", "name", "city"]. - column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). Este parâmetro é opcional. O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.

stored_as_scd_type Tipo: str ou int Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. Esta cláusula é opcional. O padrão é SCD tipo 1.

track_history_column_list track_history_except_column_list Tipo: list Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Uso track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. O senhor pode declarar qualquer um dos valores como uma lista de strings ou como funções Spark SQL col(): - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). Este parâmetro é opcional. O endereço default é para incluir todas as colunas na tabela de destino quando não houver track_history_column_list ou track_history_except_column_list o argumento é passado para a função.

captura de dados de alterações (CDC) do banco de dados Snapshot com Python em DLT

info

Visualização

A API APPLY CHANGES FROM SNAPSHOT está em pré-visualização pública.

Use a função apply_changes_from_snapshot() no site Python API para usar a funcionalidade DLT de captura de dados de alterações (CDC) (CDC) para processar os dados de origem do banco de dados Snapshot.

important

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino do apply_changes_from_snapshot(), você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by .

Para criar a tabela de destino necessária, o senhor pode usar a função create_streaming_table() na interface DLT Python.

Python
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
nota

Para o processamento APPLY CHANGES FROM SNAPSHOT, o comportamento do default é inserir uma nova linha quando não houver um registro correspondente com o mesmo key(s) no destino. Se existir um registro correspondente, ele será atualizado somente se algum dos valores na linha tiver sido alterado. As linhas com chave presentes no destino, mas não mais presentes na origem, são excluídas.

Para saber mais sobre o processamento do CDC com o Snapshot, consulte APLICAR ALTERAÇÕES APIs: Simplificar a captura de dados de alterações (CDC) com DLT. Para obter exemplos de uso da função apply_changes_from_snapshot(), consulte os exemplos de ingestão periódica de Snapshot e ingestão histórica de Snapshot.

Argumentos

target Tipo: str O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table () para criar a tabela de destino antes de executar a função apply_changes(). Este parâmetro é necessário.

source Tipo: str ou lambda function O nome de uma tabela ou view para o Snapshot periodicamente ou uma função lambda Python que retorna o Snapshot DataFrame a ser processado e a versão do Snapshot. Consulte Implementar o argumento source. Este parâmetro é necessário.

keys Tipo: list A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino. Você pode especificar: - Uma lista de strings: ["userId", "orderId"] - Uma lista de funções Spark SQL col(): [col("userId"), col("orderId"] Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). Este parâmetro é necessário.

stored_as_scd_type Tipo: str ou int Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2. Defina como 1 para SCD tipo 1 ou 2 para SCD tipo 2. Esta cláusula é opcional. O padrão é SCD tipo 1.

track_history_column_list track_history_except_column_list Tipo: list Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Uso track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. O senhor pode declarar qualquer um dos valores como uma lista de strings ou como funções Spark SQL col(): - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). Este parâmetro é opcional. O endereço default é para incluir todas as colunas na tabela de destino quando não houver track_history_column_list ou track_history_except_column_list o argumento é passado para a função.

Implemente o argumento source

A função apply_changes_from_snapshot() inclui o argumento source. Para processar o Snapshot histórico, espera-se que o argumento source seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot(): um Python DataFrame contendo os dados do Snapshot a serem processados e uma versão do Snapshot.

A seguir está a assinatura da função lambda:

Python
lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão do Snapshot processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o Snapshot a ser processado. O segundo valor da tupla é a versão do Snapshot que representa a ordem lógica do Snapshot.

Um exemplo que implementa e chama a função lambda:

Python
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None

apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)

O tempo de execução da DLT executa as seguintes etapas sempre que o pipeline que contém a função apply_changes_from_snapshot() é acionado:

  1. executar a função next_snapshot_and_version para carregar o próximo Snapshot DataFrame e a versão correspondente do Snapshot.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Detecta as alterações no novo Snapshot e as aplica de forma incremental à tabela de destino.
  4. Retorna à etapa 1 para carregar o próximo Snapshot e sua versão.

Limitações

A interface DLT Python tem a seguinte limitação:

A função pivot() não é suportada. As pivot operações em Spark exigem o carregamento ávido de dados de entrada para compute o esquema de saída. Esse recurso não é suportado no DLT.