Pular para o conteúdo principal

Carregar dados no pipeline

Você pode carregar dados de qualquer fonte de dados compatível com o Apache Spark no Databricks usando um pipeline. Você pode definir conjuntos de dados — tabelas e visualizações — no pipeline declarativo LakeFlow Spark para qualquer consulta que retorne um DataFrame Spark , incluindo DataFrames de transmissão e DataFrames Pandas para Spark . Para a tarefa de aquisição de dados, Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. As tabelas de transmissão são úteis para ingerir dados de armazenamento de objetos cloud usando Auto Loader ou de barramentos de mensagens como Kafka.

Nem todas as fontes de dados possuem suporte SQL para ingestão. No entanto, você pode misturar fontes SQL e Python no mesmo pipeline para usar Python onde for necessário. Para obter detalhes sobre como trabalhar com bibliotecas não incluídas no pipeline declarativo LakeFlow Spark por default, consulte Gerenciar dependências Python para o pipeline. Para informações gerais sobre ingestão no Databricks, consulte Conectores padrão no LakeFlow Connect.

Os exemplos a seguir demonstram alguns padrões comuns de carregamento de dados.

Carregar de uma tabela existente

Carregue dados de qualquer tabela existente no Databricks. Você pode transformar o uso de dados em uma consulta ou carregar a tabela para processamento posterior em seu pipeline.

Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)

Carregar arquivos do armazenamento de objetos cloud

Databricks recomenda o uso Auto Loader no pipeline para a maioria das tarefas de ingestão de dados do armazenamento de objetos cloud ou de arquivos em um volume Unity Catalog . Auto Loader e o pipeline são projetados para carregar de forma incremental e idempotente dados em constante crescimento à medida que chegam ao armazenamento cloud . Consulte O que é o Auto Loader? e Carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento cloud usando Auto Loader.

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("gs://mybucket/analysis/*/*/*.json")
)

Os exemplos a seguir usam Auto Loader para criar conjuntos de dados a partir de arquivos CSV em um volume Unity Catalog .

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
nota
  • Se você usar Auto Loader com notificações de arquivo e executar uma refresh completa para seu pipeline ou tabela de transmissão, será necessário limpar seu recurso manualmente. Você pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.
  • Para carregar arquivos com Auto Loader em um pipeline com Unity Catalog ativado, você deve usar locais externos. Para saber mais sobre como usar Unity Catalog com o pipeline, consulte Usar Unity Catalog com o pipeline.

Autenticar no armazenamento cloud

Auto Loader utiliza locais externos Unity Catalog para autenticar no armazenamento cloud . Você deve configurar um local externo para o caminho de armazenamento do qual deseja ler e conceder o privilégio READ FILES ao usuário que executa.

Para ingerir dados do Google Cloud Storage (GCS), configure um local externo com uma credencial de armazenamento que faça referência a um bucket do GCS. Para obter mais informações, consulte Conectar-se a um local externo Google Cloud Storage (GCS).

Se você precisar usar o modo de notificação de arquivos para Auto Loader com GCS, forneça as credenciais de um serviço Databricks ou de uma conta do Google nas opções Auto Loader . Consulte as opções específicas do Google para obter informações sobre as opções de autenticação específicas do Google.

O exemplo a seguir usa uma credencial de serviço para ativar o modo de notificação de arquivos para uma fonte do GCS.

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.projectId", "my-gcp-project")
.option("databricks.serviceCredential", "my-service-credential")
.load("gs://my-bucket/path/to/files")
)

No modo de listagem de diretórios (default), a autenticação é feita através do local externo e nenhuma credencial adicional é necessária no código.

Carregar dados de um barramento de mensagens

Você pode configurar o pipeline para ingerir dados de barramentos de mensagens. Databricks recomenda o uso de tabelas de transmissão com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência a partir de barramentos de mensagens. Para obter mais informações, consulte Otimizar a utilização do cluster do pipeline declarativo LakeFlow Spark com escalonamento automático.

Por exemplo, o código a seguir configura uma tabela de transmissão para ingerir dados do Kafka usando a função read_kafka .

Python
from pyspark import pipelines as dp

@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)

Para ingerir de outras fontes de barramento de mensagens, consulte:

Carregar dados do Azure Event Hubs

Azure Event Hubs é um serviço de transmissão de dados que fornece uma interface compatível Apache Kafka . Você pode usar o conector Kafka de transmissão estruturada, incluído no ambiente de execução do pipeline declarativo LakeFlow Spark , para carregar mensagens do Azure Event Hubs. Para saber mais sobre como carregar e processar mensagens do Azure Event Hubs, consulte Usar Azure Event Hubs como fonte de dados pipeline.

Carregar dados de sistemas externos

O pipeline declarativo LakeFlow Spark suporta o carregamento de dados de qualquer fonte de dados compatível com Databricks. Consulte Conectar-se à fonte de dados e serviço externo. Você também pode carregar o uso de dados externo da Lakehouse Federation para fontes de dados suportadas. Como o Lakehouse Federation requer Databricks Runtime 13.3 LTS ou superior, para usar o Lakehouse Federation, configure seu pipeline para usar o canal de pré-visualização.

Algumas fontes de dados não possuem suporte SQL equivalente. Se você não puder usar o Lakehouse Federation com uma dessas fontes de dados, poderá usar Python para ingerir dados da fonte. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma view materializada para acessar o estado atual dos dados em uma tabela PostgreSQL remota.

Python
import dp

@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)

Carregar conjuntos de dados pequenos ou estáticos de armazenamento de objetos cloud

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento Apache Spark . O pipeline declarativo LakeFlow Spark suporta todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar uma tabela.

Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
nota

A função SQL read_files é comum a todos os ambientes SQL no Databricks. Este é o padrão recomendado para acesso direto a arquivos usando SQL em um pipeline. Para mais informações, consulte Opções.

Carregar dados de uma fonte de dados personalizada Python

Fontes de dados personalizadas Python permitem carregar dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica ou usar seu código Python existente para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre o desenvolvimento de fonte de dados Python , consulte Fonte de dados personalizadaPySpark.

O exemplo a seguir registra uma fonte de dados personalizada com o nome de formato my_custom_datasource e lê dela nos modos lotes e transmissão.

Python
from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()

Configure uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem

Por default, as tabelas de transmissão exigem fontes somente de acréscimo. Se a sua tabela de transmissão de origem exigir atualizações ou exclusões — por exemplo, para processamento do “direito ao esquecimento” GDPR — use o sinalizador skipChangeCommits para ignorar essas alterações. Esta flag funciona apenas com spark.readStream usando a função option() e não pode ser usada quando a tabela de transmissão de origem é o destino de uma função create_auto_cdc_flow() . Para obter mais informações, consulte Tratar alterações nas tabelas Delta de origem.

Python
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Acesse com segurança credenciais de armazenamento com segredos em um pipeline

Você pode usar segredos Databricks para armazenar credenciais como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade do Spark nas configurações do cluster do pipeline. Consulte Configurar compute clássica para pipeline.

O exemplo a seguir usa um segredo para armazenar uma key de acesso necessária para ler dados de entrada de uma account de armazenamento Azure Data Lake Storage usando Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, uma chave AWS para acessar S3 ou a senha de um metastore Apache Hive metastore.

Para saber mais sobre como trabalhar com Azure Data Lake Storage, consulte Conectar-se ao Azure Data Lake Storage e ao Blob Storage.

nota

Você deve adicionar o prefixo spark.hadoop. à key configuração spark_conf que define o valor secreto.

JSON
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}

Neste exemplo de código, substitua os seguintes valores.

Espaço reservado

Substitua por

<container-name>

O nome do contêiner account de armazenamento Azure .

<storage-account-name>

O nome account de armazenamento ADLS .

<path>

O caminho para os dados de saída e metadados do pipeline.

<scope-name>

O nome do escopo secreto Databricks .

<secret-name>

O nome da key que contém a key de acesso account de armazenamento Azure .

Python
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)

Neste exemplo de código, substitua os seguintes valores.

Espaço reservado

Substitua por

<container-name>

O nome do contêiner account de armazenamento Azure que armazena os dados de entrada.

<storage-account-name>

O nome account de armazenamento ADLS .

<path-to-input-dataset>

O caminho para o dataset de entrada.