Pular para o conteúdo principal

Inscreva-se no Google Pub/Sub

Use o conector integrado para se inscrever no Google Pub/Sub. Este conector fornece semântica de processamento "exatamente uma vez" para registros do assinante.

nota

O modelo Pub/Sub pode publicar registros duplicados ou os registros podem chegar ao assinante fora de ordem. Escreva um código para lidar com registros duplicados e fora de ordem.

Configurar uma transmissão do Pub/Sub

Os exemplos a seguir demonstram a configuração de uma leitura de transmissão estruturada do Pub/Sub usando uma credencial de serviço. Para todas as opções de autenticação, consulte Configurar o acesso ao Pub/Sub.

Python
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.option("serviceCredential", "service-credential-name")
.load()
)

Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.

Configurar o acesso ao Pub/Sub

As credenciais que você configurar devem ter as seguintes funções.

Papéis

Obrigatório ou opcional

Como o papel é utilizado

roles/pubsub.viewer ou roles/viewer

Obrigatório

Verifica se existe inscrição e obtém inscrição.

roles/pubsub.subscriber

Obrigatório

Busca dados de uma inscrição.

roles/pubsub.editor ou roles/editor

Opcional

Permite a criação de uma inscrição caso não exista e permite o uso de deleteSubscriptionOnStreamStop para excluir a inscrição no término da transmissão.

A Databricks recomenda configurar uma credencial de serviço para leituras do Pub/Sub. credenciais de serviço para Pub/Sub exigem Databricks Runtime 16.1 ou acima. Consulte Criar credenciais de serviço. No entanto, se as credenciais de serviço Databricks não estiverem disponíveis, você pode usar diretamente uma conta de serviço do Google (GSA). Se você configurar compute para usar uma GSA (Acesso Global de Serviços), as permissões para a GSA estarão disponíveis para todas as consultas em execução nesse cluster. Consulte accountdo serviço Google.

nota

O senhor não pode anexar um GSA ao site compute configurado com o modo de acesso padrão.

Configure as seguintes opções para passar o GSA diretamente para a transmissão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Entenda o esquema Pub/Sub

O esquema da transmissão corresponde aos registros obtidos do Pub/Sub, conforme descrito na tabela a seguir.

campo

Tipo

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Configurar opções de leitura de transmissão Pub/Sub

A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>").

nota

Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de micro-lotes . Isso reflete detalhes internos de implementação e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são obtidos e depois processados.

Opção

Valor padrão

Descrição

numFetchPartitions

Definido como a metade do número de executores presentes na inicialização da transmissão.

O número de Spark tarefas paralelas que buscam registros de uma inscrição.

deleteSubscriptionOnStreamStop

false

Se true, a inscrição passada para a transmissão será excluída quando o Job de transmissão terminar.

maxBytesPerTrigger

none

Um limite flexível para o tamanho dos lotes a serem processados durante cada micro-lote acionado.

maxRecordsPerFetch

1000

O número de registros a serem buscados por tarefa antes de processar os registros.

maxFetchPeriod

10s

O tempo de duração para cada tarefa buscar antes de processar os registros. Aceita strings de duração, por exemplo, 1s para 1 segundo ou 1m para 1 minuto. Databricks recomenda o uso do valor default .

Utilize o processamento de lotes incrementais com o Pub/Sub.

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes Pub/Sub como lotes incrementais.

O Databricks registra o carimbo de data/hora quando você inicia uma leitura com a configuração Trigger.AvailableNow . Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora anterior ao carimbo de data/hora de início da transmissão registrada. Para mais informações consulte AvailableNow: Processamento incremental de lotes.

Monitore as transmissões do Pub/Sub

As métricas de progresso da transmissão estruturada informam o número de registros obtidos e prontos para serem processados, o tamanho dos registros obtidos e prontos para serem processados e o número de duplicatas vistas desde o início da transmissão. A seguir, um exemplo dessas métricas:

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

Limitações

Pub/Sub não suporta execução especulativa (spark.speculation).