Consultar dados de transmissão
O senhor pode usar o site Databricks para consultar a transmissão fonte de dados usando a transmissão estructurada. Databricks Oferece amplo suporte para cargas de trabalho de transmissão em Python e Scala, e suporta a maior parte da funcionalidade de transmissão estruturada com SQL.
Os exemplos a seguir demonstram o uso de um coletor de memória para inspeção manual dos dados de transmissão durante o desenvolvimento interativo no Notebook. Devido aos limites de saída de linha na UI do Notebook, o senhor pode não observar todos os dados lidos pelas consultas de transmissão. Nas cargas de trabalho de produção, o senhor só deve acionar as consultas de transmissão gravando-as em uma tabela de destino ou em um sistema externo.
SQL O suporte para consultas interativas em dados de transmissão é limitado ao Notebook em execução anexado ao site compute. O senhor também pode usar SQL ao declarar tabelas de transmissão com DLT. Consulte O que é DLT? .
Consultar dados de sistemas de transmissão
Databricks fornece leitores de dados de transmissão para os seguintes sistemas de transmissão:
- Kafka
- Kinesis
- PubSub
- Pulsar
Você deve fornecer detalhes de configuração ao inicializar consultas nesses sistemas, que variam de acordo com o ambiente configurado e o sistema do qual você escolhe ler. Consulte Configurar transmissão fonte de dados.
As cargas de trabalho comuns que envolvem sistemas de transmissão incluem a ingestão de dados no site lakehouse e o processamento de transmissão para enviar dados a sistemas externos. Para obter mais informações sobre as cargas de trabalho de transmissão, consulte transmissão em Databricks.
Os exemplos a seguir demonstram uma transmissão interativa lida em Kafka:
- Python
- SQL
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Consultar uma tabela como uma leitura de transmissão
Databricks cria todas as tabelas usando Delta Lake por default. Quando o senhor executa uma consulta de transmissão em uma tabela Delta, a consulta obtém automaticamente novos registros quando uma versão da tabela é confirmada. Em default, as consultas de transmissão esperam que as tabelas de origem contenham apenas registros anexados. Se o senhor precisar trabalhar com dados de transmissão que contenham atualizações e exclusões, o site Databricks recomenda o uso de DLT e APPLY CHANGES INTO
. Consulte o site APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.
Os exemplos a seguir demonstram a execução de uma leitura de transmissão interativa de uma tabela:
- Python
- SQL
display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name
Consultar dados no armazenamento de objetos na nuvem com o Auto Loader
O senhor pode transmitir dados do armazenamento de objetos na nuvem usando Auto Loader, o conector de dados na nuvem Databricks. O senhor pode usar o conector com arquivos armazenados em volumes do Unity Catalog ou em outros locais de armazenamento de objetos na nuvem. A Databricks recomenda o uso de volumes para gerenciar o acesso aos dados no armazenamento de objetos na nuvem. Consulte Conectar-se à fonte de dados.
Databricks otimiza esse conector para a ingestão de transmissão de dados no armazenamento de objetos na nuvem que são armazenados em formatos populares estruturados, semiestruturados e não estruturados. Databricks recomenda o armazenamento de dados ingeridos em um formato quase bruto para maximizar a taxa de transferência e minimizar a possível perda de dados devido a registros corrompidos ou alterações no esquema.
Para obter mais recomendações sobre a ingestão de dados do armazenamento de objetos na nuvem, consulte Ingestão de dados em um Databricks lakehouse.
Os exemplos a seguir demonstram uma leitura de transmissão interativa de um diretório de arquivos JSON em um volume:
- Python
- SQL
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')