Inscreva-se no Google Pub/Sub
Use o conector integrado para se inscrever no Google Pub/Sub. Este conector fornece semântica de processamento "exatamente uma vez" para registros do assinante.
O modelo Pub/Sub pode publicar registros duplicados ou os registros podem chegar ao assinante fora de ordem. Escreva um código para lidar com registros duplicados e fora de ordem.
Configurar uma transmissão do Pub/Sub
O exemplo de código a seguir demonstra a sintaxe básica para configurar uma transmissão estruturada lida do Pub/Sub.
- Python
- SQL
- Scala
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.
Configurar o acesso ao Pub/Sub
As credenciais que você configurar devem ter as seguintes funções.
Papéis | Obrigatório ou opcional | Como o papel é utilizado |
|---|---|---|
| Obrigatório | Verifica se existe inscrição e obtém inscrição. |
| Obrigatório | Busca dados de uma inscrição. |
| Opcional | Permite a criação de uma inscrição caso não exista e permite o uso de |
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmailclientIdprivateKeyprivateKeyId
Entenda o esquema Pub/Sub
O esquema da transmissão corresponde aos registros obtidos do Pub/Sub, conforme descrito na tabela a seguir.
campo | Tipo |
|---|---|
|
|
|
|
|
|
|
|
Configurar opções de leitura de transmissão Pub/Sub
A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>").
Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de micro-lotes . Isso reflete detalhes internos de implementação e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são obtidos e depois processados.
Opção | Valor padrão | Descrição |
|---|---|---|
| Definido como a metade do número de executores presentes na inicialização da transmissão. | O número de Spark tarefas paralelas que buscam registros de uma inscrição. |
|
| Se |
|
| Um limite flexível para o tamanho dos lotes a serem processados durante cada micro-lote acionado. |
|
| O número de registros a serem buscados por tarefa antes de processar os registros. |
|
| O tempo de duração para cada tarefa buscar antes de processar os registros. Aceita strings de duração, por exemplo, |
Utilize o processamento de lotes incrementais com o Pub/Sub.
Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub como lotes incrementais.
O Databricks registra o carimbo de data/hora quando você inicia uma leitura com a configuração Trigger.AvailableNow . Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora anterior ao carimbo de data/hora de início da transmissão registrada. Para mais informações consulte AvailableNow: Processamento incremental de lotes.
Monitore as transmissões do Pub/Sub
As métricas de progresso da transmissão estruturada informam o número de registros obtidos e prontos para serem processados, o tamanho dos registros obtidos e prontos para serem processados e o número de duplicatas vistas desde o início da transmissão. A seguir, um exemplo dessas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
Pub/Sub não suporta execução especulativa (spark.speculation).