Pular para o conteúdo principal

Inscreva-se no Google Pub/Sub

Use o conector integrado para se inscrever no Google Pub/Sub. Este conector fornece semântica de processamento "exatamente uma vez" para registros do assinante.

nota

O modelo Pub/Sub pode publicar registros duplicados ou os registros podem chegar ao assinante fora de ordem. Escreva um código para lidar com registros duplicados e fora de ordem.

Configurar uma transmissão do Pub/Sub

O exemplo de código a seguir demonstra a sintaxe básica para configurar uma transmissão estruturada lida do Pub/Sub.

Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}

query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)

Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.

Configurar o acesso ao Pub/Sub

As credenciais que você configurar devem ter as seguintes funções.

Papéis

Obrigatório ou opcional

Como o papel é utilizado

roles/pubsub.viewer ou roles/viewer

Obrigatório

Verifica se existe inscrição e obtém inscrição.

roles/pubsub.subscriber

Obrigatório

Busca dados de uma inscrição.

roles/pubsub.editor ou roles/editor

Opcional

Permite a criação de uma inscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para excluir a inscrição no término da transmissão.

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Entenda o esquema Pub/Sub

O esquema da transmissão corresponde aos registros obtidos do Pub/Sub, conforme descrito na tabela a seguir.

campo

Tipo

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Configurar opções de leitura de transmissão Pub/Sub

A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>").

nota

Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de micro-lotes . Isso reflete detalhes internos de implementação e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são obtidos e depois processados.

Opção

Valor padrão

Descrição

numFetchPartitions

Definido como a metade do número de executores presentes na inicialização da transmissão.

O número de Spark tarefas paralelas que buscam registros de uma inscrição.

deleteSubscriptionOnStreamStop

false

Se true, a inscrição passada para a transmissão será excluída quando o Job de transmissão terminar.

maxBytesPerTrigger

none

Um limite flexível para o tamanho dos lotes a serem processados durante cada micro-lote acionado.

maxRecordsPerFetch

1000

O número de registros a serem buscados por tarefa antes de processar os registros.

maxFetchPeriod

10s

O tempo de duração para cada tarefa buscar antes de processar os registros. Aceita strings de duração, por exemplo, 1s para 1 segundo ou 1m para 1 minuto. Databricks recomenda o uso do valor default .

Utilize o processamento de lotes incrementais com o Pub/Sub.

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub como lotes incrementais.

O Databricks registra o carimbo de data/hora quando você inicia uma leitura com a configuração Trigger.AvailableNow . Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora anterior ao carimbo de data/hora de início da transmissão registrada. Para mais informações consulte AvailableNow: Processamento incremental de lotes.

Monitore as transmissões do Pub/Sub

As métricas de progresso da transmissão estruturada informam o número de registros obtidos e prontos para serem processados, o tamanho dos registros obtidos e prontos para serem processados e o número de duplicatas vistas desde o início da transmissão. A seguir, um exemplo dessas métricas:

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Pub/Sub não suporta execução especulativa (spark.speculation).