transmissão do Apache Pulsar

Visualização

Esse recurso está em visualização pública.

No Databricks Runtime 14.1 e acima, você pode usar a transmissão estruturada para transmitir dados do Apache Pulsar no Databricks.

A transmissão estructurada fornece semântica de processamento exatamente uma vez para dados lidos de fontes Pulsar.

Exemplo de sintaxe

A seguir está um exemplo básico de uso de transmissão estruturada para leitura do Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Você deve sempre fornecer um service.url e uma das seguintes opções para especificar tópicos:

  • topic

  • topics

  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura de transmissão do Pulsar.

Autenticar no Pulsar

Databricks oferece suporte à autenticação de armazenamento confiável e keystore para Pulsar. A Databricks recomenda a utilização de segredos ao armazenar detalhes de configuração.

Você pode definir as seguintes opções durante a configuração da transmissão:

  • pulsar.client.authPluginClassName

  • pulsar.client.authParams

  • pulsar.client.useKeyStoreTls

  • pulsar.client.tlsTrustStoreType

  • pulsar.client.tlsTrustStorePath

  • pulsar.client.tlsTrustStorePassword

Se a transmissão usar um PulsarAdmin, defina também o seguinte:

  • pulsar.admin.authPluginClassName

  • pulsar.admin.authParams

O exemplo a seguir demonstra a configuração de opções de autenticação:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema pulsar

O esquema de registros lidos no Pulsar depende de como os tópicos têm seus esquemas codificados.

  • Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.

  • Para tópicos sem esquema ou com tipo de dados simples no Pulsar, o payload é carregado em uma coluna value .

  • Se o leitor estiver configurado para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma coluna value .

Os registros Pulsar possuem os seguintes campos de metadados:

Coluna

Tipo

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Configurar opções para leitura de transmissão de Pulsar

Todas as opções são configuradas como parte de uma leitura de transmissão estruturada usando a sintaxe .option("<optionName>", "<optionValue>") . Você também pode configurar a autenticação usando opções. Consulte Autenticar no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topic, topics ou topicsPattern.

Opção

Valor padrão

Descrição

service.url

nenhum

A configuração Pulsar serviceUrl para o serviço Pulsar.

topic

nenhum

Uma strings de nome de tópico para o tópico consumir.

topics

nenhum

Uma lista separada por vírgulas dos tópicos a serem consumidos.

topicsPattern

nenhum

Uma strings regex Java para corresponder aos tópicos a serem consumidos.

A tabela a seguir descreve outras opções suportadas pelo Pulsar:

Opção

Valor padrão

Descrição

predefinedSubscription

nenhum

O nome de inscrição predefinido usado pelo conector para rastrear o progresso das aplicações Spark.

subscriptionPrefix

nenhum

Um prefixo usado pelo conector para gerar uma inscrição aleatória para acompanhar o progresso das aplicações Spark.

pollTimeoutMs

120.000

O tempo limite para leitura de mensagens do Pulsar em milissegundos.

waitingForNonExistedTopic

false

Se o conector deve aguardar até que os tópicos desejados sejam criados.

failOnDataLoss

true

Controla se uma query deve falhar quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).

allowDifferentTopicSchemas

false

Se vários tópicos com esquemas diferentes forem lidos, use esse parâmetro para desativar a desserialização automática do valor do tópico baseada em esquema. Somente os valores brutos são retornados quando for true.

startingOffsets

latest

Se latest, o leitor lê os registros mais recentes após começar a ser executado. Se earliest, o leitor lê desde o deslocamento mais antigo. O usuário também pode especificar strings JSON que especificam um deslocamento específico.

maxBytesPerTrigger

nenhum

Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisará ser especificado.

admin.url

nenhum

A configuração do Pulsar serviceHttpUrl . Necessário apenas quando maxBytesPerTrigger é especificado.

Você também pode especificar qualquer configuração de cliente, administrador e leitor Pulsar usando os seguintes padrões:

Padrão

Link para opções de configuração

pulsar.client.*

Configuração do cliente Pulsar

pulsar.admin.*

Configuração de administrador do Pulsar

pulsar.reader.*

Configuração do leitor Pulsar

Construir deslocamentos iniciais JSON

Você pode construir manualmente um ID de mensagem para especificar um deslocamento específico e transmiti-lo como JSON para a opção startingOffsets . O exemplo de código a seguir demonstra essa sintaxe:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()