Funções para definir conjunto de dados
O módulo pyspark.pipelines (aqui referido como dp) implementa grande parte de sua funcionalidade principal usando decoradores. Esses decoradores aceitam uma função que define uma consulta de transmissão ou lotes e retorna um DataFrame Apache Spark . A sintaxe a seguir mostra um exemplo simples para definir um dataset pipeline :
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Esta página fornece uma visão geral das funções e consultas que definem o conjunto de dados no pipeline. Para obter uma lista completa dos decoradores disponíveis, consulte a referência do desenvolvedor de pipeline.
As funções que você usa para definir o conjunto de dados não devem incluir lógica Python arbitrária não relacionada ao dataset, incluindo chamadas a APIs de terceiros. A execução do pipeline dessas funções ocorre várias vezes durante o planejamento, a validação e as atualizações. A inclusão de lógica arbitrária pode levar a resultados inesperados.
Ler dados para iniciar uma definição dataset
As funções usadas para definir o conjunto de dados pipeline normalmente começam com uma operação spark.read ou spark.readStream . Essas operações de leitura retornam um objeto DataFrame estático ou de transmissão que você usa para definir transformações adicionais antes de retornar o DataFrame. Outros exemplos de operações do Spark que retornam um DataFrame incluem spark.table ou spark.range.
Funções nunca devem referenciar DataFrames definidos fora da função. Tentar referenciar DataFrames definidos em um escopo diferente pode resultar em comportamento inesperado. Para obter um exemplo de um padrão de metaprogramação para criar várias tabelas, consulte Criar tabelas em um loop for.
Os exemplos a seguir mostram a sintaxe básica para leitura de uso de dados lotes ou lógica de transmissão:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Se você precisar ler dados de uma API REST externa, implemente essa conexão usando uma fonte de dados personalizada Python . Consulte Fonte de dados personalizadaPySpark.
É possível criar DataFrames arbitrários Apache Spark a partir de coleções de dados Python , incluindo DataFrames Pandas , dicionários e listas. Esses padrões podem ser úteis durante o desenvolvimento e os testes, mas a maioria das definições de dataset pipeline de produção deve começar carregando dados de arquivos, de um sistema externo ou de uma tabela ou view existente.
Encadeamento de transformações
O pipeline oferece suporte a praticamente todas as transformações DataFrame Apache Spark . Você pode incluir qualquer número de transformações na sua função de definição dataset , mas deve garantir que os métodos utilizados sempre retornem um objeto DataFrame .
Se você tiver uma transformação intermediária que direciona diversas cargas de trabalho downstream, mas não precisa materializá-la como uma tabela, use @dp.temporary_view() para adicionar uma view temporária ao seu pipeline. Você pode então referenciar essa view usando spark.read.table("temp_view_name") em várias definições de dataset posteriores. A sintaxe a seguir demonstra esse padrão:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Isso garante que o pipeline tenha pleno conhecimento das transformações em sua view durante o planejamento pipeline e evita possíveis problemas relacionados à execução de código Python arbitrário fora das definições dataset .
Dentro da sua função, você pode encadear DataFrames para criar novos DataFrames sem gravar resultados incrementais como tabelas de exibição, exibição materializada ou transmissão, como no exemplo a seguir:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Se todos os seus DataFrames realizarem suas leituras iniciais usando lógica de lotes, o resultado de retorno será um DataFrame estático. Se você tiver alguma consulta que seja transmitida, seu resultado de retorno será um DataFrame de transmissão.
Retornar um DataFrame
Use @dp.table para criar uma tabela de transmissão a partir dos resultados de uma leitura de transmissão. Use @dp.materialized_view para criar uma view materializada a partir dos resultados de uma leitura de lotes. A maioria dos outros decoradores funcionam tanto em DataFrames de transmissão quanto em DataFrames estáticos, enquanto alguns exigem um DataFrame de transmissão.
A função usada para definir um dataset deve retornar um DataFrame Spark . Nunca utilize métodos que salvam ou gravam em arquivos ou tabelas como parte do código do seu dataset pipeline .
Exemplos de operações do Apache Spark que nunca devem ser usadas em código de pipeline:
collect()count()toPandas()save()saveAsTable()start()toTable()
O pipeline também oferece suporte ao uso Pandas no Spark para funções de definição de dataset . Veja a API Pandas no Spark.
Use SQL em um pipeline Python
O PySpark suporta o operador spark.sql para escrever código DataFrame usando SQL. Ao usar esse padrão no código-fonte pipeline , ele é compilado em visualizações materializadas ou tabelas de transmissão.
O exemplo de código a seguir é equivalente ao uso de spark.read.table("catalog_name.schema_name.table_name") para a lógica de consulta dataset :
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read e dlt.read_stream (legado)
O módulo dlt mais antigo inclui funções dlt.read() e dlt.read_stream() que foram introduzidas para dar suporte à funcionalidade no modo de publicação de pipeline legado. Esses métodos são suportados, mas o Databricks recomenda sempre usar as funções spark.read.table() e spark.readStream.table() devido ao seguinte:
- As funções
dlttêm suporte limitado para leitura de conjuntos de dados definidos fora do pipeline atual. - As funções
sparkoferecem suporte à especificação de opções, comoskipChangeCommits, para ler operações. A especificação de opções não é suportada pelas funçõesdlt. - O módulo
dltfoi substituído pelo módulopyspark.pipelines. Databricks recomenda o uso defrom pyspark import pipelines as dppara importarpyspark.pipelinespara uso ao escrever código de pipeline em Python.