Pular para o conteúdo principal

tabelas de transmissão

Uma tabela Delta é uma tabela Delta com suporte adicional para processamento de dados incrementais ou de transmissão. Uma tabela de transmissão pode ser alvo de um ou mais fluxos em um pipeline.

tabelas de transmissão são uma boa opção para aquisição de dados pelos seguintes motivos:

  • Cada linha de entrada é manipulada apenas uma vez, o que modela a grande maioria das cargas de trabalho de ingestão (ou seja, anexando ou inserindo linhas em uma tabela).
  • Eles podem lidar com grandes volumes de dados somente de acréscimo.

As tabelas de transmissão também são uma boa opção para transformações de transmissão de baixa latência, pois podem raciocinar sobre linhas e janelas de tempo, lidar com grandes volumes de dados e fornecer processamento de baixa latência.

O diagrama a seguir mostra como os fluxos leem de fontes de transmissão e gravam incrementalmente em uma tabela de transmissão dentro de um pipeline.

Diagrama mostrando as fontes de transmissão S3, Kafka e Pub/Sub conectadas por fluxos individuais que leem novos dados em um pipeline contendo uma tabela de transmissão.

Em cada atualização, os fluxos associados a uma tabela de transmissão leem a informação alterada numa fonte de transmissão e acrescentam nova informação a essa tabela.

As tabelas de transmissão são de propriedade e atualizadas por um único pipeline. Você define explicitamente as tabelas de transmissão no código-fonte do pipeline. As tabelas definidas por um pipeline não podem ser alteradas ou atualizadas por nenhum outro pipeline. Você pode definir vários fluxos para adicionar a uma única tabela de transmissão.

Databricks cria tabelas internas para dar suporte ao processamento de tabelas de transmissão. Essas tabelas aparecem em system.information_schema.tables mas não são visíveis no Explorador de Catálogo ou em outras páginas da interface do usuário workspace .

nota

Ao criar uma tabela de transmissão fora de um pipeline usando Databricks SQL, Databricks cria um pipeline que é usado para atualizar a tabela. Você pode visualizar o pipeline selecionando "Tarefas e pipeline" na navegação à esquerda do seu workspace. Você pode adicionar a coluna do tipo de pipeline à sua view. As tabelas de transmissão definidas em um pipeline têm um tipo de ETL. As tabelas de transmissão criadas no Databricks SQL têm um tipo de MV/ST.

Para obter mais informações sobre fluxos, consulte Carregar e processar dados incrementalmente com fluxos de pipeline declarativos LakeFlow Spark.

tabelas de transmissão para ingestão

As tabelas de transmissão são projetadas para fontes de dados somente de acréscimo e processam as entradas apenas uma vez. Isso os torna ideais para cargas de trabalho de ingestão em que os dados chegam continuamente e devem ser capturados de forma confiável, sem reprocessar os registros existentes. Databricks suporta a ingestão de dados de armazenamento cloud e barramentos de mensagens de transmissão.

Ingerir arquivos do armazenamento cloud

Você pode usar uma tabela de transmissão para ingerir novos arquivos do armazenamento cloud . Esses exemplos usam o Auto Loader para processar novos arquivos incrementalmente à medida que chegam.

Python
from pyspark import pipelines as dp

# Create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)

Para criar uma tabela de transmissão, a definição do conjunto de dados deve ser do tipo transmissão. Quando você usa a função spark.readStream em uma definição de conjunto de dados, ela retorna um dataset de transmissão.

Ingerir mensagens

Você também pode usar tabelas de transmissão para ingerir dados de barramentos de mensagens. O exemplo a seguir demonstra como criar uma tabela de transmissão que lê de um tópico do Pub/Sub.

Python
@dp.table
def pubsub_raw():
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
return (
spark.readStream
.format("pubsub")
.option("subscriptionId", "my-subscription")
.option("topicId", "my-topic")
.option("projectId", "my-project")
.options(auth_options)
.load()
)

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. Consulte Configurar o acesso ao Pub/Sub para obter todas as opções de autenticação.

Para obter mais detalhes sobre como carregar dados na tabela de transmissão, consulte Carregar dados no pipeline.

O diagrama a seguir ilustra como funcionam as tabelas de transmissão somente de acréscimo.

Diagrama que mostra como funcionam as tabelas de transmissão somente de acréscimo

Uma linha que já foi adicionada a uma tabela de transmissão não será consultada novamente em atualizações posteriores do pipeline. Se você modificar a consulta (por exemplo, de SELECT LOWER (name) para SELECT UPPER (name)), as linhas existentes não serão atualizadas para maiúsculas, mas as novas linhas serão. Você pode acionar uma refresh completa para buscar novamente todos os dados anteriores da tabela de origem e atualizar todas as linhas da tabela de transmissão.

tabelas de transmissão e transmissão de baixa latência

tabelas de transmissão são projetadas para transmissão de baixa latência em estado limitado. As tabelas de transmissão usam gerenciamento de ponto de verificação, o que as torna adequadas para transmissão de baixa latência. No entanto, eles esperam transmissões que sejam naturalmente delimitadas ou delimitadas com uma marca d'água.

Uma transmissão naturalmente delimitada é produzida por uma fonte de dados que tem início e fim bem definidos. Um exemplo de transmissão naturalmente limitada é a leitura de dados de um diretório de arquivos onde nenhum arquivo novo é adicionado após a inserção de um lote inicial de arquivos. A transmissão é considerada limitada porque o número de arquivos é finito e a transmissão termina depois que todos os arquivos forem processados.

Você também pode usar uma marca d'água para delimitar uma transmissão. Uma marca d'água em transmissão estruturada é um mecanismo que ajuda a lidar com dados atrasados, especificando quanto tempo o sistema deve esperar por eventos atrasados antes de considerar a janela de tempo como completa. Uma transmissão ilimitada e sem marca d'água pode causar falha no pipeline devido à pressão na memória.

Para obter mais informações sobre o processamento de transmissões com estado, consulte Otimizar o processamento com estado usando marcas d'água.

transmissão-Junção instantânea

transmissão-Snapshot join conecta um dataset de transmissão a uma tabela de dimensões que é capturada no início da transmissão. Como a tabela de dimensões é tratada como fixa naquele momento, quaisquer alterações feitas nela após o início da transmissão não são refletidas na join. Isso é aceitável quando pequenas discrepâncias não importam — por exemplo, quando o número de transações é muitas ordens de grandeza maior do que o número de clientes.

O exemplo de código a seguir une uma tabela de dimensão com duas linhas chamadas customers com um conjunto de dados sempre crescente, transactions. Ele materializa uma join entre esses dois conjuntos de dados em uma tabela chamada sales_report. Se um processo externo atualizar a tabela de clientes adicionando uma nova linha (customer_id=3, name=Zoya), esta nova linha não estará presente na join porque a tabela de dimensão estática foi capturada quando as transmissões foram iniciadas.

Python
from pyspark import pipelines as dp

@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")

@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")

return facts.join(dims, on="customer_id", how="inner")

limitações da tabela de transmissão

As tabelas de transmissão têm as seguintes limitações:

  • Evolução limitada: você pode alterar a consulta sem recalcular todo o conjunto de dados. Sem uma refresh completa, uma tabela de transmissão vê cada linha apenas uma vez, portanto, consultas diferentes processarão linhas diferentes. Por exemplo, se você adicionar UPPER() a um campo na consulta, somente as linhas processadas após a alteração estarão em maiúsculas. Isso significa que você precisa estar ciente de todas as versões anteriores da consulta que estão sendo executadas em seu conjunto de dados. Para reprocessar linhas existentes que foram processadas antes da alteração, é necessário realizar uma refresh completa.
  • Gerenciamento de estado: as tabelas de transmissão têm baixa latência e exigem transmissões que sejam naturalmente limitadas ou limitadas com uma marca d'água. Para obter mais informações, consulte Otimizar o processamento com estado usando marcas d'água.
  • A junção não recalcula: as junções em tabelas de transação não são recalculadas quando as dimensões mudam. Essa característica pode ser útil em cenários de "ação rápida, porém incorreta". Se você deseja que sua view esteja sempre correta, talvez queira usar uma view materializada. As visões materializadas estão sempre corretas porque recalculam automaticamente a junção quando as dimensões mudam. Para obter mais informações, consulte Visualização materializada.