Assine o Google Pub/Sub

Databricks fornece um conector integrado para assinar o Google Pub/Sub em Databricks Runtime 13.3 LTS e acima. Esse conector fornece uma semântica de processamento exatamente única para registros do assinante.

Observação

O Pub/Sub pode publicar registros duplicados, e os registros podem chegar ao assinante fora de ordem. Você deve escrever código Databricks para lidar com registros duplicados e fora de ordem.

Exemplo de sintaxe

O exemplo de código a seguir demonstra a sintaxe básica para configurar uma leitura de transmissão estruturada do Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão do Pub/Sub.

Configurar o acesso ao Pub/Sub

A Databricks recomenda a utilização de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:

  • clientEmail

  • clientId

  • privateKey

  • privateKeyId

A tabela a seguir descreve as funções necessárias para as credenciais configuradas:

Papéis

Obrigatório ou opcional

Como é usado

roles/pubsub.viewer ou roles/viewer

Obrigatório

Verifique se existe inscrição e obtenha inscrição

roles/pubsub.subscriber

Obrigatório

Buscar dados de uma inscrição

roles/pubsub.editor ou roles/editor

Opcional

Permite a criação de uma inscrição se ela não existir e também permite o uso de deleteSubscriptionOnStreamStop para excluir a inscrição no encerramento do stream

Esquema Pub/Sub

O esquema da transmissão corresponde aos registros buscados no Pub/Sub, conforme descrito na tabela a seguir:

campo

Tipo

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Configurar opções de leitura de transmissão do Pub/Sub

A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma leitura de transmissão estruturada usando a sintaxe .option("<optionName>", "<optionValue>") .

Observação

Algumas opções de configuração do Pub/Sub usam o conceito de buscas em vez de micro-lotes. Isto reflete detalhes de implementação interna, e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são buscados e então processados.

Opção

Valor padrão

Descrição

numFetchPartitions

Definido para o número de executor na inicialização da transmissão

O número de tarefas paralelas do Spark que buscam registros de uma inscrição.

deleteSubscriptionOnStreamStop

false

Se true, a inscrição passada para a transmissão será excluída quando o Job de transmissão terminar.

maxBytesPerTrigger

nenhum

Um limite flexível para o tamanho dos lotes a serem processados durante cada microlote acionado.

maxRecordsPerFetch

1000

O número de registros a serem buscados por tarefa antes do processamento dos registros.

maxFetchPeriod

10 segundos

A duração de cada tarefa a ser buscada antes do processamento dos registros. Databricks recomenda usar o valor default .

Semântica de processamento de lotes incrementais para Pub/Sub

Você pode usar Trigger.AvailableNow para consumir registros disponíveis das fontes do Pub/Sub em lotes incrementais.

O Databricks registra o carimbo de data/hora quando você inicia uma leitura com a configuração Trigger.AvailableNow . Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora menor que o carimbo de data/hora da transmissão começar registrada.

Consulte Configurando o processamento de lotes incrementais.

métricas de monitoramento transmissão

As métricas de progresso da transmissão estruturada relatam o número de registros buscados e prontos para processamento, o tamanho dos registros buscados e prontos para processamento e o número de duplicatas vistas desde o início da transmissão. A seguir está um exemplo dessas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitações

A execução especulativa (spark.speculation) não é compatível com o Pub/Sub.