Pular para o conteúdo principal

Funções para definir o conjunto de dados

O módulo dlt implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta de transmissão ou de lotes e retorna um Apache Spark DataFrame. A sintaxe a seguir mostra um exemplo simples de definição de um pipeline declarativo LakeFlow dataset:

import dlt

@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset

Esta página fornece uma visão geral das funções e consultas que definem o conjunto de dados no pipeline declarativo LakeFlow. Para obter uma lista completa dos decoradores disponíveis, consulte LakeFlow Declarative pipeline developer reference.

As funções que o senhor usa para definir o conjunto de dados não devem incluir lógica Python arbitrária não relacionada ao dataset, incluindo chamadas para APIs de terceiros. LakeFlow O pipeline declarativo executa essas funções várias vezes durante o planejamento, a validação e as atualizações. Incluir lógica arbitrária pode levar a resultados inesperados.

Ler dados para iniciar uma definição de dataset

As funções usadas para definir o conjunto de dados do pipeline declarativo LakeFlow geralmente começam com uma operação spark.read ou spark.readStream. Essas operações de leitura retornam um objeto estático ou de transmissão DataFrame que o senhor usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações spark que retornam um DataFrame incluem spark.table, ou spark.range.

As funções nunca devem fazer referência a DataFrames definidos fora da função. A tentativa de fazer referência a DataFrames definidos em um escopo diferente pode resultar em um comportamento inesperado. Para ver um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um loop for.

Os exemplos a seguir mostram a sintaxe básica para leitura de uso de dados lotes ou lógica de transmissão:

Python
import dlt

# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)

Se o senhor precisar ler dados de um site externo REST API, implemente essa conexão usando uma fonte de dados personalizada Python. Consulte PySpark custom fonte de dados.

nota

É possível criar Apache Spark DataFrames arbitrários a partir de Python coleções de dados, incluindo Pandas DataFrames, ditados e listas. Esses padrões podem ser úteis durante o desenvolvimento e os testes, mas a maioria das LakeFlow definições de pipeline declarativo de dataset produção deve começar carregando dados de arquivos, de um sistema externo ou de uma tabela existente ou.view

Transformações em cadeia

LakeFlow O pipeline declarativo é compatível com quase todas as Apache Spark DataFrame transformações. O senhor pode incluir qualquer número de transformações na sua função de definição dataset, mas deve garantir que os métodos utilizados sempre retornem um objeto DataFrame.

Se o senhor tiver uma transformação intermediária que gere várias cargas de trabalho downstream, mas não precisar materializá-la como uma tabela, use @dlt.view() para adicionar um view temporário ao seu pipeline. Em seguida, o senhor pode fazer referência a esse view usando spark.read.table("temp_view_name") em várias definições downstream de dataset. A sintaxe a seguir demonstra esse padrão:

Python
import dlt

@dlt.view()
def a():
return spark.read.table("source").filter(...)

@dlt.table()
def b():
return spark.read.table("b").groupBy(...)

@dlt.table()
def c():
return spark.read.table("c").groupBy(...)

Isso garante que o pipeline LakeFlow Declarative tenha total conhecimento das transformações em seu view durante o planejamento pipeline e evita possíveis problemas relacionados a códigos arbitrários Python executados fora das definições dataset.

Em sua função, o senhor pode encadear DataFrames para criar um novo DataFrames sem gravar resultados incrementais como visualização, visualização materializada ou tabelas de transmissão, como no exemplo a seguir:

Python
import dlt

@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)

Se todos os seus DataFrames realizarem suas leituras iniciais usando a lógica de lotes, seu resultado de retorno será um DataFrame estático. Se o senhor tiver alguma consulta que seja transmissão, o resultado do retorno será uma transmissão DataFrame.

Retorna um DataFrame

Para o decorador @dlt.table(), retornar um DataFrame estático significa que o senhor está definindo um view materializado. O retorno de uma transmissão DataFrame significa que o senhor está definindo uma tabela de transmissão. A maioria dos decoradores funciona tanto na transmissão quanto na estática DataFrames, enquanto outros exigem uma transmissão DataFrame.

A função usada para definir um dataset deve retornar um Spark DataFrame. Nunca use métodos que salvem ou gravem em arquivos ou tabelas como parte do seu código LakeFlow Declarative pipeline dataset.

Exemplos de Apache Spark operações que nunca devem ser usadas em LakeFlow código de pipeline declarativo:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()
nota

LakeFlow O pipeline declarativo também suporta o uso de Pandas em Spark para funções de definição de dataset. Consulte API do Pandas no Spark.

Usar SQL em um pipeline Python

O PySpark suporta o operador spark.sql para escrever código DataFrame usando SQL. Quando o senhor usa esse padrão no código-fonte do pipeline declarativo LakeFlow, ele é compilado em uma visualização materializada ou em tabelas de transmissão.

O exemplo de código a seguir é equivalente a usar spark.read.table("catalog_name.schema_name.table_name") para a lógica de consulta dataset:

Python
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read e dlt.read_stream (legado)

O módulo dlt inclui as funções dlt.read() e dlt.read_stream() que foram introduzidas para dar suporte à funcionalidade no modo de publicação do pipeline legado. Esses métodos são compatíveis, mas a Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() devido ao seguinte:

  • As funções do site dlt têm suporte limitado para a leitura de conjuntos de dados definidos fora do site pipeline atual.
  • As funções spark suportam a especificação de opções, como skipChangeCommits, para operações de leitura. A especificação de opções não é suportada pelas funções dlt.