Pular para o conteúdo principal

Carregar dados com DLT

O senhor pode carregar dados de qualquer fonte de dados suportada por Apache Spark em Databricks usando DLT. O senhor pode definir o conjunto de dados (tabelas e visualizações) em DLT em relação a qualquer consulta que retorne um Spark DataFrame, incluindo a transmissão DataFrames e Pandas para Spark DataFrames. Para a tarefa de ingestão de dados, o site Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso. As tabelas de transmissão são boas para ingerir dados do armazenamento de objetos na nuvem usando Auto Loader ou de barramentos de mensagens como Kafka.

nota

Os exemplos abaixo demonstram alguns padrões comuns.

Carregar de uma tabela existente

Carregar dados de qualquer tabela existente no Databricks. O senhor pode transformar o uso de dados em uma consulta ou carregar a tabela para processamento posterior em seu site pipeline.

O exemplo a seguir lê dados de uma tabela existente:

Python
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)

Carregar arquivos do armazenamento de objetos na nuvem

Databricks recomenda o uso de Auto Loader com DLT para a maioria das tarefas de ingestão de dados do armazenamento de objetos na nuvem ou de arquivos em um volume Unity Catalog. O Auto Loader e o DLT foram projetados para carregar de forma incremental e idempotente dados cada vez maiores à medida que eles chegam ao armazenamento em nuvem.

Consulte O que é o Auto Loader? e Carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento em nuvem usando o Auto Loader:

Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)

Os exemplos a seguir usam Auto Loader para criar um conjunto de dados a partir de arquivos CSV em um volume Unity Catalog:

Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
nota
  • Se o senhor usar Auto Loader com notificações de arquivo e executar um refresh completo para sua tabela pipeline ou de transmissão, deverá limpar manualmente o recurso. O senhor pode usar o CloudFilesResourceManager em um Notebook para realizar a limpeza.
  • Para carregar arquivos com o Auto Loader em um pipeline habilitado para o Unity Catalog, o senhor deve usar locais externos. Para saber mais sobre como usar Unity Catalog com DLT, consulte Usar Unity Catalog com seu pipeline DLT.

Carregar dados de um barramento de mensagens

O senhor pode configurar o pipeline DLT para ingerir dados de barramentos de mensagens. Databricks recomenda o uso de tabelas de transmissão com execução contínua e autoescala aprimorada para fornecer a ingestão mais eficiente para o carregamento de baixa latência de barramentos de mensagens. Consulte Otimizar a utilização de clustering do pipeline DLT com autoscale.

Por exemplo, o código a seguir configura uma tabela de transmissão para ingerir dados de Kafka, usando a função read_kafka:

Python
import dlt

@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)

Para ingerir de outras fontes de barramento de mensagens, consulte:

Carregar dados dos Hubs de Eventos do Azure

Azure O Event Hubs é um serviço de transmissão de dados que oferece uma interface compatível com Apache Kafka . O senhor pode usar o conector de transmissão estruturada Kafka, incluído no tempo de execução do DLT, para carregar mensagens do Azure Event Hubs. Para saber mais sobre como carregar e processar mensagens dos Azure Event Hubs, consulte Use Azure Event Hubs as a DLT fonte de dados.

Carregar dados de sistemas externos

A DLT suporta o carregamento de dados de qualquer fonte de dados suportada pelo site Databricks. Consulte Conectar-se à fonte de dados. O senhor também pode carregar o uso externo de dados da Lakehouse Federation para fontes de dados compatíveis. Como o Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou acima, para usar o Lakehouse Federation, o seu pipeline deve ser configurado para usar o canal de visualização.

Algumas fontes de dados não têm suporte equivalente em SQL. Se o senhor não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar um notebook Python para ingerir dados da fonte. O senhor pode adicionar código-fonte Python e SQL ao mesmo pipeline DLT. O exemplo a seguir declara um view materializado para acessar o estado atual dos dados em uma tabela PostgreSQL remota:

Python
import dlt

@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)

Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos na nuvem

O senhor pode carregar um conjunto de dados pequeno ou estático usando a sintaxe de carregamento Apache Spark. O DLT é compatível com todos os formatos de arquivo suportados pelo Apache Spark no Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas DLT:

Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
nota

A função read_files SQL é comum a todos os ambientes SQL na Databricks. Esse é o padrão recomendado para acesso direto a arquivos usando SQL com DLT. Para obter mais informações, consulte Opções.

Configurar uma tabela de transmissão para ignorar as alterações em uma tabela de transmissão de origem

nota
  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option() . Você não pode usar este sinalizador em uma função dlt.read_stream() .
  • O senhor não pode usar o sinalizador skipChangeCommits quando a tabela de transmissão de origem é definida como o destino de uma função apply_changes().

Pelo site default, as tabelas de transmissão exigem fontes somente de anexos. Quando uma tabela de transmissão usa outra tabela de transmissão como fonte e a tabela de transmissão de origem requer atualizações ou exclusões, por exemplo, GDPR processamento do "direito de ser esquecido", o sinalizador skipChangeCommits pode ser definido ao ler a tabela de transmissão de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

Python
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Acesse com segurança as credenciais de armazenamento com segredos em um pipeline

O senhor pode usar Databricks os segredos do site para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade Spark na configuração de clustering das configurações pipeline. Consulte Configurar compute para um DLT pipeline.

O exemplo a seguir usa um segredo para armazenar um acesso key necessário para ler dados de entrada de um lago de dados Azure Armazenamento (ADLS) armazenamento account usando Auto Loader. O senhor pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, a chave AWS para acessar S3, ou a senha para um Apache Hive metastore.

Para saber mais sobre como trabalhar com o Azure data lake Storage, consulte Conectar-se ao Azure data lake Storage e Blob Storage.

nota

O senhor deve adicionar o prefixo spark.hadoop. à configuração spark_conf key que define o valor do segredo.

JSON
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}

Substituir

  • <storage-account-name> com o nome ADLS storage account.
  • <scope-name> pelo nome do secret scope do Databricks.
  • <secret-name> pelo nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
Python
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)

Substituir

  • <container-name> com o nome do contêiner Azure storage account que armazena os dados de entrada.
  • <storage-account-name> com o nome ADLS storage account.
  • <path-to-input-dataset> com o caminho para a entrada dataset.