Inscreva-se no Google Pub/Sub
Use o conector integrado para assinar o Google Pub/Sub. Esse conector fornece semântica de processamento exatamente uma vez para registros do assinante.
O Pub/Sub poderia publicar linhas duplicadas, ou as linhas poderiam chegar ao assinante fora de ordem. Você deve escrever código para lidar com linhas duplicadas e fora de ordem.
Configurar uma transmissão do Pub/Sub
O exemplo de código a seguir mostra como configurar uma leitura de transmissão estructurada do Pub/Sub e autenticar com chaves privadas.
- Python
- Scala
- SQL
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.
Configurar o acesso ao Pub/Sub
Suas credenciais devem ter as seguintes funções:
Papéis | Obrigatório ou opcional | Como o papel é utilizado |
|---|---|---|
| Obrigatório | Verifica se existe inscrição e obtém inscrição. |
| Obrigatório | Busca dados de uma inscrição. |
| Opcional | Permite a criação de uma inscrição caso não exista e permite o uso de |
A Databricks recomenda o uso de segredos ao usar chaves. As seguintes opções são obrigatórias para autorizar uma conexão.
clientEmailclientIdprivateKeyprivateKeyId
Entenda o esquema Pub/Sub
O esquema para a transmissão corresponde às linhas que são buscadas do Pub/Sub, conforme descrito na tabela a seguir:
campo | Tipo |
|---|---|
|
|
|
|
|
|
|
|
Configurar opções de leitura de transmissão Pub/Sub
Algumas opções de configuração do Pub/Sub utilizam o conceito de buscas em vez de microlotes . Este é um detalhe de implementação interno, e as opções funcionam de forma semelhante a outros conectores de transmissão estructurada, exceto que as linhas são buscadas e então processadas.
Para a lista completa de opções, consulte Pub/Sub.
Utilize o processamento de lotes incrementais com o Pub/Sub.
Você pode usar Trigger.AvailableNow para consumir linhas disponíveis das fontes Pub/Sub como um lote incremental.
O Databricks registra o carimbo de data/hora ao iniciar uma leitura com a configuração Trigger.AvailableNow. Linhas processadas pelo lote incluem todos os dados previamente buscados e quaisquer linhas recém-publicadas com um timestamp menor que o timestamp inicial registrado. Para obter mais informações, consulte AvailableNow: Processamento de lotes incremental.
Monitore as transmissões do Pub/Sub
As métricas de progresso da Transmissão estructurada relatam o número de linhas buscadas e prontas para processar, o tamanho das linhas buscadas e prontas para processar, e o número de duplicatas vistas desde o início da transmissão.
O seguinte é um exemplo de métricas do Pub/Sub:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
O Pub/Sub não oferece suporte à execução especulativa com spark.speculation.