Carregar dados no pipeline
Você pode carregar dados de qualquer fonte de dados compatível com Apache Spark no Databricks usando um pipeline. Você pode definir um conjunto de dados (tabelas e visualizações) no pipeline declarativo LakeFlow Spark para qualquer consulta que retorne um DataFrame Spark , incluindo DataFrames de transmissão e DataFrames Pandas para Spark . Para a tarefa de aquisição de dados, Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. As tabelas de transmissão são boas para ingerir dados de armazenamento de objetos cloud usando Auto Loader ou de barramentos de mensagens como Kafka.
- Nem todas as fontes de dados possuem suporte SQL para ingestão. Você pode combinar fontes SQL e Python em um pipeline para usar Python onde for necessário e SQL para outras operações no mesmo pipeline.
- Para obter detalhes sobre como trabalhar com bibliotecas não incluídas no pipeline declarativo LakeFlow Spark por default, consulte Gerenciar dependências Python para o pipeline.
- Para obter informações gerais sobre ingestão no Databricks, consulte Conectores padrão no LakeFlow Connect.
Os exemplos abaixo demonstram alguns padrões comuns.
Carregar de uma tabela existente
Carregue dados de qualquer tabela existente no Databricks. Você pode transformar o uso de dados em uma consulta ou carregar a tabela para processamento posterior em seu pipeline.
O exemplo a seguir lê dados de uma tabela existente:
- Python
- SQL
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Carregar arquivos do armazenamento de objetos cloud
Databricks recomenda o uso Auto Loader no pipeline para a maioria das tarefas de ingestão de dados do armazenamento de objetos cloud ou de arquivos em um volume Unity Catalog . Auto Loader e o pipeline são projetados para carregar de forma incremental e idempotente dados em constante crescimento à medida que chegam ao armazenamento cloud .
Veja O que é o Auto Loader? e Carregar dados do armazenamento de objetos.
O exemplo a seguir lê dados do armazenamento cloud usando Auto Loader:
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
's3://mybucket/analysis/*/*/*.json',
format => "json"
);
Os exemplos a seguir usam Auto Loader para criar conjuntos de dados a partir de arquivos CSV em um volume Unity Catalog :
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
- Se você usar Auto Loader com notificações de arquivo e executar uma refresh completa para seu pipeline ou tabela de transmissão, será necessário limpar seu recurso manualmente. Você pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.
- Para carregar arquivos com Auto Loader em um pipeline com Unity Catalog ativado, você deve usar locais externos. Para saber mais sobre como usar Unity Catalog com o pipeline, consulte Usar Unity Catalog com o pipeline.
Carregar dados de um barramento de mensagens
Você pode configurar o pipeline para ingerir dados de barramentos de mensagens. Databricks recomenda o uso de tabelas de transmissão com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência a partir de barramentos de mensagens. Consulte Otimizar a utilização cluster do pipeline declarativo LakeFlow Spark com escalonamento automático.
Por exemplo, o código a seguir configura uma tabela de transmissão para ingerir dados do Kafka, usando a função read_kafka :
- Python
- SQL
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Para ingerir de outras fontes de barramento de mensagens, consulte:
- Kinesis: leitura_cinesis
- Tópico Pub/Sub: read_pubsub
- Pulsar: leitura_pulsar
Carregar dados dos Hubs de Eventos do Azure
Azure Event Hubs é um serviço de transmissão de dados que fornece uma interface compatível Apache Kafka . Você pode usar o conector Kafka de transmissão estruturada, incluído no ambiente de execução do pipeline declarativo LakeFlow Spark , para carregar mensagens do Azure Event Hubs. Para saber mais sobre como carregar e processar mensagens do Azure Event Hubs, consulte Usar Azure Event Hubs como fonte de dados pipeline.
Carregar dados de sistemas externos
O pipeline declarativo LakeFlow Spark suporta o carregamento de dados de qualquer fonte de dados compatível com Databricks. Consulte Conectar-se à fonte de dados e serviço externo. Você também pode carregar o uso de dados externo da Lakehouse Federation para fontes de dados suportadas. Como o Lakehouse Federation requer Databricks Runtime 13.3 LTS ou superior, para usar o Lakehouse Federation, seu pipeline deve ser configurado para usar o canal de pré-visualização.
Algumas fontes de dados não têm suporte equivalente em SQL. Se você não puder usar o lakehouse Federation com uma dessas fontes de dados, você pode usar Python para ingerir dados da fonte. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma view materializada para acessar o estado atual dos dados em uma tabela remota PostgreSQL :
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos cloud
Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento Apache Spark . O pipeline declarativo LakeFlow Spark suporta todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para obter uma lista completa, consulte Opções de formato de dados.
Os exemplos a seguir demonstram o carregamento de JSON para criar uma tabela:
- Python
- SQL
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
A função SQL read_files é comum a todos os ambientes SQL no Databricks. Este é o padrão recomendado para acesso direto a arquivos usando SQL em um pipeline. Para mais informações, consulte Opções.
Carregar dados de uma fonte de dados personalizada Python
Fontes de dados personalizadas Python permitem que você carregue dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica ou aproveitar o código Python existente em seus sistemas para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre o desenvolvimento de fonte de dados Python , consulte Fonte de dados personalizadaPySpark.
Para usar uma fonte de dados personalizada Python para carregar dados em um pipeline, registre-a com um nome de formato, como my_custom_datasource, e então leia dela:
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Configure uma tabela de transmissão para ignorar alterações em uma tabela de transmissão de origem
- O sinalizador
skipChangeCommitsfunciona apenas comspark.readStreamusando a funçãooption(). Você não pode usar este sinalizador em uma funçãodp.read_stream(). - Você não pode usar o sinalizador
skipChangeCommitsquando a tabela de transmissão de origem estiver definida como o destino de uma função create_auto_cdc_flow() .
Por default, as tabelas de transmissão exigem fontes somente de anexação. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte, e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, processamento de “direito de ser esquecido” GDPR , o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Acesse com segurança credenciais de armazenamento com segredos em um pipeline
Você pode usar segredos Databricks para armazenar credenciais como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade do Spark nas configurações do cluster do pipeline. Consulte Configurar compute clássica para pipeline.
O exemplo a seguir usa um segredo para armazenar uma key acesso necessária para ler dados de entrada de uma account de armazenamento Azure Data Lake Storage (ADLS) usando Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, a chave AWS para acessar S3 ou a senha para um Hive metastore Apache Hive.
Para saber mais sobre como trabalhar com Azure Data Lake Storage, consulte Conectar-se ao Azure Data Lake Storage e ao Blob Storage.
Você deve adicionar o prefixo spark.hadoop. à key configuração spark_conf que define o valor secreto.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Substituir
<storage-account-name>com o nome account de armazenamento ADLS .<scope-name>pelo nome do secret scope do Databricks.<secret-name>pelo nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Substituir
<container-name>com o nome do contêiner account armazenamento Azure que armazena os dados de entrada.<storage-account-name>com o nome account de armazenamento ADLS .<path-to-input-dataset>com o caminho para o dataset de entrada.