Consultar dados de transmissão

Você pode usar o Databricks para query a fonte de dados usando transmissão estructurada. O Databricks fornece amplo suporte para cargas de trabalho de transmissão em Python e Scala e oferece suporte à maioria das funcionalidades de transmissão estruturada com SQL.

Os exemplos a seguir demonstram o uso de um coletor de memória para inspeção manual de dados de transmissão durante o desenvolvimento interativo no Notebook. Devido aos limites de saída de linha na IU Notebook , talvez você não observe todos os dados lidos pela transmissão query. Em cargas de trabalho de produção, você só deve acionar query de transmissão gravando-as em uma tabela de destino ou sistema externo.

Observação

O suporte SQL para query interativa em dados de transmissão é limitado ao Notebook executado em compute para todos os fins. Você também pode usar SQL ao declarar tabelas de transmissão em Databricks SQL ou Delta Live Tables. Consulte Carregar tabelas de uso de dados transmissão em Databricks SQL e O que são Delta Live Tables?.

Consulta de dados de sistemas de transmissão

A Databricks fornece leitores de dados de transmissão para os seguintes sistemas de transmissão:

  • Kafka

  • Kinesis

  • PubSub

  • Pulsar

Você deve fornecer detalhes de configuração ao inicializar query nesses sistemas, que variam dependendo do ambiente configurado e do sistema escolhido para leitura. Consulte Configurar transmissão de fonte de dados.

Cargas de trabalho comuns que envolvem sistemas de transmissão incluem ingestão de dados para o lakehouse e processamento de transmissão para drenar dados para sistemas externos. Para obter mais informações sobre cargas de trabalho de transmissão, consulte transmissão em Databricks.

Os exemplos a seguir demonstram uma transmissão interativa lida do Kafka:

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Consultar uma tabela como uma transmissão lida

Databricks cria todas as tabelas usando Delta Lake por default. Quando o senhor executa uma consulta de transmissão em uma tabela Delta, a consulta obtém automaticamente novos registros quando uma versão da tabela é confirmada. Pelo site default, as consultas de transmissão esperam que as tabelas de origem contenham apenas registros anexados. Se o senhor precisar trabalhar com dados de transmissão que contenham atualizações e exclusões, o site Databricks recomenda o uso de Delta Live Tables e APPLY CHANGES INTO. Consulte APLICAR ALTERAÇÕES em API: Simplificar a captura de dados de alterações (CDC) em Delta Live Tables.

Os exemplos a seguir demonstram a realização de uma leitura de transmissão interativa de uma tabela:

display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name

Consulte dados no armazenamento de objetos clouds com o Auto Loader

Você pode transmitir dados do armazenamento de objetos clouds usando o Auto Loader, o conector de dados clouds do Databricks. Você pode usar o conector com arquivos armazenados em volumes do Unity Catalog ou em outros locais de armazenamento de objetos clouds . Databricks recomenda o uso de volumes para gerenciar o acesso aos dados no armazenamento de objetos clouds . Consulte Conectar-se à fonte de dados.

O Databricks otimiza esse conector para transmissão de ingestão de dados em armazenamento de objetos clouds que são armazenados em formatos populares estruturados, semiestruturados e não estruturados. A Databricks recomenda armazenar os dados ingeridos em um formato quase bruto para maximizar a taxa de transferência e minimizar a possível perda de dados devido a registros corrompidos ou alterações de esquema.

Para obter mais recomendações sobre a ingestão de dados do armazenamento de objetos em nuvens, consulte Ingestão de dados em um Databricks lakehouse.

Os exemplos a seguir demonstram uma transmissão interativa lida de um diretório de arquivos JSON em um volume:

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')