Consultar dados de transmissão
Você pode usar o Databricks para query a fonte de dados usando transmissão estructurada. O Databricks fornece amplo suporte para cargas de trabalho de transmissão em Python e Scala e oferece suporte à maioria das funcionalidades de transmissão estruturada com SQL.
Os exemplos a seguir demonstram o uso de um coletor de memória para inspeção manual de dados de transmissão durante o desenvolvimento interativo no Notebook. Devido aos limites de saída de linha na IU Notebook , talvez você não observe todos os dados lidos pela transmissão query. Em cargas de trabalho de produção, você só deve acionar query de transmissão gravando-as em uma tabela de destino ou sistema externo.
Observação
SQL O suporte para consultas interativas em dados de transmissão é limitado ao Notebook em execução no site compute. O senhor também pode usar SQL quando declarar tabelas de transmissão em Databricks SQL ou Delta Live Tables. Consulte as tabelas Load use de dados transmission em Databricks SQL e What is Delta Live Tables?
Consulta de dados de sistemas de transmissão
A Databricks fornece leitores de dados de transmissão para os seguintes sistemas de transmissão:
Kafka
Kinesis
PubSub
Pulsar
Você deve fornecer detalhes de configuração ao inicializar query nesses sistemas, que variam dependendo do ambiente configurado e do sistema escolhido para leitura. Consulte Configurar transmissão de fonte de dados.
Cargas de trabalho comuns que envolvem sistemas de transmissão incluem ingestão de dados para o lakehouse e processamento de transmissão para drenar dados para sistemas externos. Para obter mais informações sobre cargas de trabalho de transmissão, consulte transmissão em Databricks.
Os exemplos a seguir demonstram uma transmissão interativa lida do Kafka:
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
Consultar uma tabela como uma transmissão lida
Databricks cria todas as tabelas usando Delta Lake por default. Quando o senhor executa uma consulta de transmissão em uma tabela Delta, a consulta obtém automaticamente novos registros quando uma versão da tabela é confirmada. Pelo site default, as consultas de transmissão esperam que as tabelas de origem contenham apenas registros anexados. Se o senhor precisar trabalhar com dados de transmissão que contenham atualizações e exclusões, o site Databricks recomenda o uso de Delta Live Tables e APPLY CHANGES INTO
. Consulte a seção APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com Delta Live Tables.
Os exemplos a seguir demonstram a realização de uma leitura de transmissão interativa de uma tabela:
display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name
Consulte dados no armazenamento de objetos clouds com o Auto Loader
Você pode transmitir dados do armazenamento de objetos clouds usando o Auto Loader, o conector de dados clouds do Databricks. Você pode usar o conector com arquivos armazenados em volumes do Unity Catalog ou em outros locais de armazenamento de objetos clouds . Databricks recomenda o uso de volumes para gerenciar o acesso aos dados no armazenamento de objetos clouds . Consulte Conectar-se à fonte de dados.
O Databricks otimiza esse conector para transmissão de ingestão de dados em armazenamento de objetos clouds que são armazenados em formatos populares estruturados, semiestruturados e não estruturados. A Databricks recomenda armazenar os dados ingeridos em um formato quase bruto para maximizar a taxa de transferência e minimizar a possível perda de dados devido a registros corrompidos ou alterações de esquema.
Para obter mais recomendações sobre a ingestão de dados do armazenamento de objetos em nuvens, consulte Ingestão de dados em um Databricks lakehouse.
Os exemplos a seguir demonstram uma transmissão interativa lida de um diretório de arquivos JSON em um volume:
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')