Pular para o conteúdo principal

Inscreva-se no Google Pub/Sub

Databricks fornece um conector integrado para assinar o Google Pub/Sub em Databricks Runtime 13.3 LTS e acima. Esse conector fornece uma semântica de processamento de exatamente uma vez para registros do assinante.

nota

O Pub/Sub pode publicar registros duplicados, e os registros podem chegar ao assinante fora de ordem. O senhor deve escrever o código do Databricks para lidar com registros duplicados e fora de ordem.

Exemplo de sintaxe

O exemplo de sintaxe a seguir demonstra a configuração de uma transmissão estruturada lida do Pub/Sub usando uma credencial de serviço. Para todas as opções de autenticação, consulte Configurar o acesso ao Pub/Sub.

Scala
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.option("serviceCredential", "service-credential-name") // required
.load()

Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.

Configurar o acesso ao Pub/Sub

A tabela a seguir descreve as funções necessárias para as credenciais configuradas:

Papéis

Obrigatório ou opcional

Como é usado

roles/pubsub.viewer ou roles/viewer

Obrigatório

Verificar se a inscrição existe e obter a inscrição

roles/pubsub.subscriber

Obrigatório

Obter dados de uma inscrição

roles/pubsub.editor ou roles/editor

Opcional

Permite a criação de uma inscrição caso não exista e também permite o uso do site deleteSubscriptionOnStreamStop para excluir a inscrição no término da transmissão

A Databricks recomenda a configuração de uma credencial de serviço para leituras Pub/Sub. As credenciais de serviço para Pub/Sub requerem Databricks Runtime 16.2 e acima. Consulte gerenciar o acesso ao serviço de nuvem externo usando credenciais de serviço.

Se as credenciais do Databricks serviço não estiverem disponíveis, o senhor poderá usar uma conta do Google serviço (GSA) diretamente.

Se o senhor configurar o site compute para usar um GSA, as permissões para o GSA estarão disponíveis para todas as consultas em execução nesse clustering. Consulte o serviço do Google account.

nota

O senhor não pode anexar um GSA ao site compute configurado com o modo de acesso padrão.

O senhor pode configurar as seguintes opções para passar o GSA diretamente para a transmissão:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Esquema Pub/Sub

O esquema da transmissão corresponde aos registros que são obtidos do Pub/Sub, conforme descrito na tabela a seguir:

campo

Tipo

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Configurar opções de leitura de transmissão Pub/Sub

A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>").

nota

Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de micro-lotes . Isso reflete detalhes internos de implementação e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são obtidos e depois processados.

Opção

Valor padrão

Descrição

numFetchPartitions

Definido como a metade do número de executores presentes na inicialização da transmissão.

O número de Spark tarefas paralelas que buscam registros de uma inscrição.

deleteSubscriptionOnStreamStop

false

Se true, a inscrição passada para a transmissão será excluída quando o Job de transmissão terminar.

maxBytesPerTrigger

nenhum

Um limite flexível para o tamanho dos lotes a serem processados durante cada micro-lote acionado.

maxRecordsPerFetch

1000

O número de registros a serem buscados por tarefa antes de processar os registros.

maxFetchPeriod

10 segundos

O tempo de duração de cada tarefa para buscar antes de processar os registros. Databricks recomenda usar o valor default.

Semântica de processamento de lotes incrementais para Pub/Sub

O senhor pode usar o site Trigger.AvailableNow para consumir os registros disponíveis das fontes Pub/Sub em lotes incrementais.

O Databricks registra o registro de data e hora quando o usuário inicia uma leitura com a configuração Trigger.AvailableNow. Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor do que o carimbo de data/hora da transmissão registrada.

Consulte Configuração do processamento de lotes incrementais.

monitoramento de transmissão métricas

As métricas de progresso da transmissão estruturada informam o número de registros obtidos e prontos para serem processados, o tamanho dos registros obtidos e prontos para serem processados e o número de duplicatas vistas desde o início da transmissão. A seguir, um exemplo dessas métricas:

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

Limitações

A execução especulativa (spark.speculation) não é compatível com o Pub/Sub.