Funções para definir conjunto de dados
O módulo pyspark.pipelines
(aqui conhecido como dp
) implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta transmissão ou lotes e retorna um Apache Spark DataFrame. A sintaxe a seguir mostra um exemplo simples para definir um dataset de pipeline declarativo LakeFlow :
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Esta página fornece uma visão geral das funções e consultas que definem o conjunto de dados no pipeline declarativo LakeFlow . Para obter uma lista completa de decoradores disponíveis, consulte Referência do desenvolvedor de pipeline declarativoLakeFlow.
As funções que você usa para definir o conjunto de dados não devem incluir lógica Python arbitrária não relacionada ao dataset, incluindo chamadas para APIs de terceiros. O pipeline declarativo LakeFlow executa essas funções várias vezes durante o planejamento, a validação e as atualizações. Incluir lógica arbitrária pode levar a resultados inesperados.
Ler dados para iniciar uma definição dataset
As funções usadas para definir o conjunto de dados do pipeline declarativo LakeFlow geralmente começam com spark.read
ou spark.readStream
operações. Essas operações de leitura retornam um objeto DataFrame estático ou de transmissão que você usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações spark que retornam um DataFrame incluem spark.table
ou spark.range
.
Funções nunca devem referenciar DataFrames definidos fora da função. Tentar referenciar DataFrames definidos em um escopo diferente pode resultar em comportamento inesperado. Para obter um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um loop for
.
Os exemplos a seguir mostram a sintaxe básica para leitura de uso de dados lotes ou lógica de transmissão:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Se você precisar ler dados de uma API REST externa, implemente essa conexão usando uma fonte de dados personalizada Python . Consulte Fonte de dados personalizadaPySpark.
É possível criar DataFrames arbitrários do Apache Spark a partir de coleções de dados do Python, incluindo DataFrames, dicionários e listas do Pandas. Esses padrões podem ser úteis durante o desenvolvimento e os testes, mas a maioria das definições dataset do pipeline declarativo LakeFlow de produção deve começar carregando dados de arquivos, de um sistema externo ou de uma tabela ou view existente.
Encadeamento de transformações
O pipeline declarativo LakeFlow suporta quase todas as transformações Apache Spark DataFrame . Você pode incluir qualquer número de transformações na sua função de definição dataset , mas deve garantir que os métodos usados sempre retornem um objeto DataFrame .
Se você tiver uma transformação intermediária que direciona diversas cargas de trabalho downstream, mas não precisa materializá-la como uma tabela, use @dp.temporary_view()
para adicionar uma view temporária ao seu pipeline. Você pode então referenciar essa view usando spark.read.table("temp_view_name")
em várias definições de dataset posteriores. A sintaxe a seguir demonstra esse padrão:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Isso garante que o pipeline declarativo LakeFlow tenha total conhecimento das transformações em sua view durante o planejamento pipeline e evita possíveis problemas relacionados à execução de código Python arbitrário fora das definições dataset .
Dentro da sua função, você pode encadear DataFrames para criar novos DataFrames sem gravar resultados incrementais como tabelas de exibição, exibição materializada ou transmissão, como no exemplo a seguir:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Se todos os seus DataFrames realizarem suas leituras iniciais usando lógica de lotes, o resultado de retorno será um DataFrame estático. Se você tiver alguma consulta que seja transmitida, seu resultado de retorno será um DataFrame de transmissão.
Retornar um DataFrame
Use @dp.table
para criar uma tabela de transmissão a partir dos resultados de uma leitura de transmissão. Use @dp.materialized_view
para criar uma view materializada a partir dos resultados de uma leitura de lotes. A maioria dos outros decoradores funcionam tanto em DataFrames de transmissão quanto em DataFrames estáticos, enquanto alguns exigem um DataFrame de transmissão.
A função usada para definir um dataset deve retornar um Spark DataFrame. Nunca use métodos que salvam ou gravam em arquivos ou tabelas como parte do código dataset do pipeline declarativo LakeFlow .
Exemplos de operações Apache Spark que nunca devem ser usadas no código do pipeline declarativo LakeFlow :
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
O pipeline declarativo LakeFlow também oferece suporte ao uso Pandas no Spark para funções de definição de dataset . Veja a API do Pandas no Spark.
Use SQL em um pipeline Python
O PySpark suporta o operador spark.sql
para escrever código DataFrame usando SQL. Quando você usa esse padrão no código-fonte do pipeline declarativo LakeFlow , ele é compilado em tabelas de exibição materializada ou transmissão.
O exemplo de código a seguir é equivalente ao uso de spark.read.table("catalog_name.schema_name.table_name")
para a lógica de consulta dataset :
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read
e dlt.read_stream
(legado)
O módulo dlt
mais antigo inclui funções dlt.read()
e dlt.read_stream()
que foram introduzidas para dar suporte à funcionalidade no modo de publicação de pipeline legado. Esses métodos são suportados, mas o Databricks recomenda sempre usar as funções spark.read.table()
e spark.readStream.table()
devido ao seguinte:
- As funções
dlt
têm suporte limitado para leitura de conjuntos de dados definidos fora do pipeline atual. - As funções
spark
oferecem suporte à especificação de opções, comoskipChangeCommits
, para ler operações. A especificação de opções não é suportada pelas funçõesdlt
. - O módulo
dlt
foi substituído pelo módulopyspark.pipelines
. Databricks recomenda usarfrom pyspark import pipelines as dp
para importarpyspark.pipelines
para uso ao escrever código de pipeline declarativo LakeFlow em Python.