Carregar dados com DLT
O senhor pode carregar dados de qualquer fonte de dados suportada por Apache Spark em Databricks usando DLT. O senhor pode definir o conjunto de dados (tabelas e visualizações) em DLT em relação a qualquer consulta que retorne um Spark DataFrame, incluindo a transmissão DataFrames e Pandas para Spark DataFrames. Para a tarefa de ingestão de dados, o site Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. As tabelas de transmissão são boas para ingerir dados do armazenamento de objetos na nuvem usando Auto Loader ou de barramentos de mensagens como Kafka.
- Nem todas as fontes de dados têm suporte para SQL. O senhor pode misturar SQL e Python Notebook em um DLT pipeline para usar SQL para todas as operações além da ingestão.
- Para obter detalhes sobre como trabalhar com uma biblioteca que não seja pacote no DLT por meio do site default, consulte gerenciar dependências do Python para o pipeline DLT.
- Para obter informações gerais sobre ingestão em Databricks, consulte Ingerir dados em um Databricks lakehouse .
Os exemplos abaixo demonstram alguns padrões comuns.
Carregar de uma tabela existente
Carregar dados de qualquer tabela existente no Databricks. O senhor pode transformar o uso de dados em uma consulta ou carregar a tabela para processamento posterior em seu site pipeline.
O exemplo a seguir lê dados de uma tabela existente:
- Python
- SQL
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Carregar arquivos do armazenamento de objetos na nuvem
Databricks recomenda o uso de Auto Loader com DLT para a maioria das tarefas de ingestão de dados do armazenamento de objetos na nuvem ou de arquivos em um volume Unity Catalog. O Auto Loader e o DLT foram projetados para carregar de forma incremental e idempotente dados cada vez maiores à medida que eles chegam ao armazenamento em nuvem.
Consulte O que é o Auto Loader? e Carregar dados do armazenamento de objetos.
O exemplo a seguir lê dados do armazenamento em nuvem usando o Auto Loader:
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
's3://mybucket/analysis/*/*/*.json',
format => "json"
);
Os exemplos a seguir usam Auto Loader para criar um conjunto de dados a partir de arquivos CSV em um volume Unity Catalog:
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
- Se o senhor usar Auto Loader com notificações de arquivo e executar um refresh completo para sua tabela pipeline ou de transmissão, deverá limpar manualmente o recurso. O senhor pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.
- Para carregar arquivos com o Auto Loader em um pipeline habilitado para o Unity Catalog, o senhor deve usar locais externos. Para saber mais sobre como usar Unity Catalog com DLT, consulte Usar Unity Catalog com seu pipeline DLT.
Carregar dados de um barramento de mensagens
O senhor pode configurar o pipeline DLT para ingerir dados de barramentos de mensagens. Databricks recomenda o uso de tabelas de transmissão com execução contínua e autoescala aprimorada para fornecer a ingestão mais eficiente para o carregamento de baixa latência de barramentos de mensagens. Consulte Otimizar a utilização de clustering do pipeline DLT com autoscale.
Por exemplo, o código a seguir configura uma tabela de transmissão para ingerir dados de Kafka, usando a função read_kafka:
- Python
- SQL
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Para ingerir de outras fontes de barramento de mensagens, consulte:
Carregar dados dos Hubs de Eventos do Azure
Azure O Event Hubs é um serviço de transmissão de dados que oferece uma interface compatível com Apache Kafka . O senhor pode usar o conector de transmissão estruturada Kafka, incluído no tempo de execução do DLT, para carregar mensagens do Azure Event Hubs. Para saber mais sobre como carregar e processar mensagens dos Azure Event Hubs, consulte Use Azure Event Hubs as a DLT fonte de dados.
Carregar dados de sistemas externos
A DLT suporta o carregamento de dados de qualquer fonte de dados suportada pelo site Databricks. Consulte Conectar-se à fonte de dados. O senhor também pode carregar o uso externo de dados da Lakehouse Federation para fontes de dados compatíveis. Como o Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou acima, para usar o Lakehouse Federation, o seu pipeline deve ser configurado para usar o canal de visualização.
Algumas fontes de dados não têm suporte equivalente em SQL. Se o senhor não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar um notebook Python para ingerir dados da fonte. O senhor pode adicionar código-fonte Python e SQL ao mesmo pipeline DLT. O exemplo a seguir declara um view materializado para acessar o estado atual dos dados em uma tabela PostgreSQL remota:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos na nuvem
O senhor pode carregar um conjunto de dados pequeno ou estático usando a sintaxe de carregamento Apache Spark. O DLT é compatível com todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para obter uma lista completa, consulte Opções de formato de dados.
Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas DLT:
- Python
- SQL
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
A função read_files
SQL é comum a todos os ambientes SQL na Databricks. Esse é o padrão recomendado para acesso direto a arquivos usando SQL com DLT. Para obter mais informações, consulte Opções.
Configurar uma tabela de transmissão para ignorar as alterações em uma tabela de transmissão de origem
- O sinalizador
skipChangeCommits
funciona apenas comspark.readStream
usando a funçãooption()
. Você não pode usar este sinalizador em uma funçãodlt.read_stream()
. - O senhor não pode usar o sinalizador
skipChangeCommits
quando a tabela de transmissão de origem é definida como o destino de uma função apply_changes().
Pelo site default, as tabelas de transmissão exigem fontes somente de anexos. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, GDPR processamento do "direito de ser esquecido", o sinalizador skipChangeCommits
pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Acesse com segurança as credenciais de armazenamento com segredos em um pipeline
O senhor pode usar Databricks os segredos do site para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade Spark na configuração de clustering das configurações pipeline. Consulte Configurar compute para um DLT pipeline.
O exemplo a seguir usa um segredo para armazenar um acesso key necessário para ler dados de entrada de um lago de dados Azure Armazenamento (ADLS) armazenamento account usando Auto Loader. O senhor pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, a chave AWS para acessar S3, ou a senha para um Apache Hive metastore.
Para saber mais sobre como trabalhar com o Azure data lake Storage, consulte Conectar-se ao Azure data lake Storage e Blob Storage.
O senhor deve adicionar o prefixo spark.hadoop.
à configuração spark_conf
key que define o valor do segredo.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Substituir
<storage-account-name>
com o nome ADLS storage account.<scope-name>
pelo nome do secret scope do Databricks.<secret-name>
pelo nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Substituir
<container-name>
com o nome do contêiner Azure storage account que armazena os dados de entrada.<storage-account-name>
com o nome ADLS storage account.<path-to-input-dataset>
com o caminho para a entrada dataset.