Pular para o conteúdo principal

Funções para definir o conjunto de dados

O módulo dlt implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta de transmissão ou de lotes e retorna um Apache Spark DataFrame. A sintaxe a seguir mostra um exemplo simples de definição de um DLT dataset:

import dlt

@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset

Esta página fornece uma visão geral das funções e consultas que definem o conjunto de dados em DLT. Para obter uma lista completa dos decoradores disponíveis, consulte a referência do desenvolvedor DLT.

As funções que o senhor usa para definir o conjunto de dados não devem incluir lógica Python arbitrária não relacionada ao dataset, incluindo chamadas para APIs de terceiros. DLT executar essas funções várias vezes durante o planejamento, a validação e as atualizações. Incluir lógica arbitrária pode levar a resultados inesperados.

Ler dados para iniciar uma definição de dataset

As funções usadas para definir o conjunto de dados DLT normalmente começam com uma operação spark.read ou spark.readStream. Essas operações de leitura retornam um objeto estático ou de transmissão DataFrame que o senhor usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações spark que retornam um DataFrame incluem spark.table, ou spark.range.

As funções nunca devem fazer referência a DataFrames definidos fora da função. A tentativa de fazer referência a DataFrames definidos em um escopo diferente pode resultar em um comportamento inesperado. Para ver um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um loop for.

Os exemplos a seguir mostram a sintaxe básica para leitura de uso de dados lotes ou lógica de transmissão:

Python
import dlt

# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)

Se o senhor precisar ler dados de um site externo REST API, implemente essa conexão usando uma fonte de dados personalizada Python. Consulte PySpark custom fonte de dados.

nota

É possível criar Apache Spark DataFrames arbitrários a partir de Python coleções de dados, incluindo Pandas DataFrames, ditados e listas. Esses padrões podem ser úteis durante o desenvolvimento e os testes, mas a maioria das definições de produção DLT dataset deve começar carregando dados de arquivos, de um sistema externo ou de uma tabela existente ou view.

Transformações em cadeia

O DLT é compatível com quase todas as transformações do Apache Spark DataFrame. O senhor pode incluir qualquer número de transformações na sua função de definição dataset, mas deve garantir que os métodos utilizados sempre retornem um objeto DataFrame.

Se o senhor tiver uma transformação intermediária que gere várias cargas de trabalho downstream, mas não precisar materializá-la como uma tabela, use @dlt.view() para adicionar um view temporário ao seu pipeline. Em seguida, o senhor pode fazer referência a esse view usando spark.read.table("temp_view_name") em várias definições downstream de dataset. A sintaxe a seguir demonstra esse padrão:

Python
import dlt

@dlt.view()
def a():
return spark.read.table("source").filter(...)

@dlt.table()
def b():
return spark.read.table("b").groupBy(...)

@dlt.table()
def c():
return spark.read.table("c").groupBy(...)

Isso garante que o DLT tenha total conhecimento das transformações em seu view durante o pipeline planejamento e evita possíveis problemas relacionados a códigos arbitrários Python executados fora das definições do dataset.

Em sua função, o senhor pode encadear DataFrames para criar um novo DataFrames sem gravar resultados incrementais como visualização, visualização materializada ou tabelas de transmissão, como no exemplo a seguir:

Python
import dlt

@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)

Se todos os seus DataFrames realizarem suas leituras iniciais usando a lógica de lotes, seu resultado de retorno será um DataFrame estático. Se o senhor tiver alguma consulta que seja transmissão, o resultado do retorno será uma transmissão DataFrame.

Retorna um DataFrame

Para o decorador @dlt.table(), retornar um DataFrame estático significa que o senhor está definindo um view materializado. O retorno de uma transmissão DataFrame significa que o senhor está definindo uma tabela de transmissão. A maioria dos decoradores funciona tanto na transmissão quanto na estática DataFrames, enquanto outros exigem uma transmissão DataFrame.

A função usada para definir um dataset deve retornar um Spark DataFrame. Nunca use métodos que salvem ou gravem em arquivos ou tabelas como parte de seu código DLT dataset .

Exemplos de operações do Apache Spark que nunca devem ser usadas no código DLT:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()
nota

DLT também suporta o uso de Pandas em Spark para funções de definição de dataset. Consulte API do Pandas no Spark.

Usar SQL em um pipeline Python

O PySpark suporta o operador spark.sql para escrever código DataFrame usando SQL. Quando o senhor usa esse padrão no código-fonte DLT, ele é compilado em tabelas de exibição materializada ou de transmissão.

O exemplo de código a seguir é equivalente a usar spark.read.table("catalog_name.schema_name.table_name") para a lógica de consulta dataset:

Python
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read e dlt.read_stream (legado)

O módulo dlt inclui as funções dlt.read() e dlt.read_stream() que foram introduzidas para dar suporte à funcionalidade no modo de publicação do pipeline legado. Esses métodos são compatíveis, mas a Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() devido ao seguinte:

  • As funções do site dlt têm suporte limitado para a leitura de conjuntos de dados definidos fora do site pipeline atual.
  • As funções spark suportam a especificação de opções, como skipChangeCommits, para operações de leitura. A especificação de opções não é suportada pelas funções dlt.