Pular para o conteúdo principal

Carregar dados com o pipeline declarativo LakeFlow

Você pode carregar dados de qualquer fonte de dados suportada pelo Apache Spark no Databricks usando o pipeline declarativo LakeFlow . Você pode definir conjuntos de dados (tabelas e visualizações) no pipeline declarativo LakeFlow em relação a qualquer consulta que retorne um Spark DataFrame, incluindo DataFrames de transmissão e Pandas para Spark DataFrames. Para a tarefa de aquisição de dados, Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. As tabelas de transmissão são boas para ingerir dados do armazenamento de objetos cloud usando Auto Loader ou de barramentos de mensagens como Kafka.

nota
  • Nem todas as fontes de dados têm suporte SQL para ingestão. Você pode misturar fontes SQL e Python no pipeline declarativo LakeFlow para usar Python onde for necessário, e SQL para outras operações no mesmo pipeline.
  • Para obter detalhes sobre como trabalhar com biblioteca não pacote no pipeline declarativo LakeFlow por default, consulte gerenciar dependências Python para o pipeline declarativo LakeFlow.
  • Para obter informações gerais sobre ingestão no Databricks, consulte Conectores padrão no LakeFlow Connect.

Os exemplos abaixo demonstram alguns padrões comuns.

Carregar de uma tabela existente

Carregue dados de qualquer tabela existente no Databricks. Você pode transformar o uso de dados em uma consulta ou carregar a tabela para processamento posterior em seu pipeline.

O exemplo a seguir lê dados de uma tabela existente:

Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)

Carregar arquivos do armazenamento de objetos cloud

Databricks recomenda usar Auto Loader com o pipeline declarativo LakeFlow para a maioria das tarefas de extração de dados do armazenamento de objetos cloud ou de arquivos em um volume Unity Catalog . Auto Loader e o pipeline declarativo LakeFlow foram projetados para carregar de forma incremental e idempotente dados cada vez maiores conforme eles chegam ao armazenamento cloud .

Veja O que é o Auto Loader? e Carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento cloud usando Auto Loader:

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)

Os exemplos a seguir usam Auto Loader para criar conjuntos de dados a partir de arquivos CSV em um volume Unity Catalog :

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
nota
  • Se você usar Auto Loader com notificações de arquivo e executar uma refresh completa para seu pipeline ou tabela de transmissão, será necessário limpar seu recurso manualmente. Você pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.
  • Para carregar arquivos com o Auto Loader em um pipeline habilitado para o Unity Catalog, você deve usar locais externos. Para saber mais sobre como usar Unity Catalog com o pipeline declarativo LakeFlow , consulte Usar Unity Catalog com seu pipeline declarativo LakeFlow.

Carregar dados de um barramento de mensagens

Você pode configurar o pipeline declarativo LakeFlow para ingerir dados de barramentos de mensagens. Databricks recomenda o uso de tabelas de transmissão com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência de barramentos de mensagens. Consulte Otimizar a utilização cluster do pipeline declarativo LakeFlow com dimensionamento automático.

Por exemplo, o código a seguir configura uma tabela de transmissão para ingerir dados do Kafka, usando a função read_kafka :

Python
from pyspark import pipelines as dp

@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)

Para ingerir de outras fontes de barramento de mensagens, consulte:

Carregar dados dos Hubs de Eventos do Azure

Azure Event Hubs é um serviço de transmissão de dados que fornece uma interface compatível com Apache Kafka . Você pode usar o conector Kafka de transmissão estruturada, incluído no tempo de execução do pipeline declarativo LakeFlow , para carregar mensagens dos Hubs de Eventos Azure . Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos Azure , consulte Usar os Hubs de Eventos Azure como um pipeline declarativo LakeFlow.

Carregar dados de sistemas externos

O pipeline declarativo LakeFlow oferece suporte ao carregamento de dados de qualquer fonte de dados suportada pelo Databricks. Consulte Conectar-se à fonte de dados e serviço externo. Você também pode carregar o uso de dados externo da Lakehouse Federation para fontes de dados suportadas. Como o Lakehouse Federation requer Databricks Runtime 13.3 LTS ou superior, para usar o Lakehouse Federation, seu pipeline deve ser configurado para usar o canal de visualização.

Algumas fontes de dados não têm suporte equivalente em SQL. Se você não puder usar o lakehouse Federation com uma dessas fontes de dados, você pode usar Python para ingerir dados da fonte. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma view materializada para acessar o estado atual dos dados em uma tabela remota PostgreSQL :

Python
import dp

@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)

Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos cloud

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento Apache Spark . O pipeline declarativo LakeFlow oferece suporte a todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas de pipeline declarativas LakeFlow :

Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
nota

A função SQL read_files é comum a todos os ambientes SQL no Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL com pipeline declarativo LakeFlow . Para mais informações, consulte Opções.

Carregar dados de uma fonte de dados personalizada Python

Fontes de dados personalizadas Python permitem que você carregue dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica ou aproveitar o código Python existente em seus sistemas para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre o desenvolvimento de fonte de dados Python , consulte Fonte de dados personalizadaPySpark.

Para usar uma fonte de dados personalizada Python para carregar dados no pipeline declarativo LakeFlow , registre-a com um nome de formato, como my_custom_datasource e, em seguida, leia a partir dela:

Python
from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()

Configure uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem

nota
  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option() . Você não pode usar este sinalizador em uma função dp.read_stream() .
  • Você não pode usar o sinalizador skipChangeCommits quando a tabela de transmissão de origem estiver definida como o destino de uma função create_auto_cdc_flow() .

Por default, as tabelas de transmissão exigem fontes somente de anexação. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito de ser esquecido” GDPR , o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

Python
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Acesse com segurança credenciais de armazenamento com segredos em um pipeline

Você pode usar os segredos Databricks para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo no seu pipeline, use uma propriedade do Spark na configuração do cluster de configurações do pipeline. Consulte Configurar compute clássica para o pipeline declarativo LakeFlow.

O exemplo a seguir usa um segredo para armazenar uma key acesso necessária para ler dados de entrada de uma account de armazenamento Azure Data Lake Storage (ADLS) usando Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, a chave AWS para acessar S3 ou a senha para um Hive metastore Apache Hive.

Para saber mais sobre como trabalhar com Azure Data Lake Storage, consulte Conectar-se ao Azure Data Lake Storage e ao Blob Storage.

nota

Você deve adicionar o prefixo spark.hadoop. à key configuração spark_conf que define o valor secreto.

JSON
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}

Substituir

  • <storage-account-name> com o nome account de armazenamento ADLS .
  • <scope-name> pelo nome do secret scope do Databricks.
  • <secret-name> pelo nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
Python
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)

Substituir

  • <container-name> com o nome do contêiner account armazenamento Azure que armazena os dados de entrada.
  • <storage-account-name> com o nome account de armazenamento ADLS .
  • <path-to-input-dataset> com o caminho para o dataset de entrada.