Pular para o conteúdo principal

transmissão de Apache Pulsar

info

Visualização

Esse recurso está em Public Preview.

Em Databricks Runtime 14.1 e acima, o senhor pode usar a transmissão estruturada para transmitir dados do Apache Pulsar em Databricks.

A transmissão estruturada fornece uma semântica de processamento exatamente única para dados lidos de fontes Pulsar.

Exemplo de sintaxe

O exemplo a seguir é um exemplo básico de uso da transmissão estruturada para leitura da Pulsar:

Scala
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()

Você deve sempre fornecer um service.url e uma das seguintes opções para especificar tópicos:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura da transmissão Pulsar.

Autentique-se no Pulsar

A Databricks oferece suporte à autenticação de truststore e keystore para a Pulsar. A Databricks recomenda o uso de segredos ao armazenar detalhes de configuração.

O senhor pode definir as seguintes opções durante a configuração da transmissão:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se a transmissão usar um PulsarAdmin, defina também o seguinte:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

O exemplo a seguir demonstra a configuração das opções de autenticação:

Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()

Esquema pulsar

O esquema dos registros lidos no Pulsar depende de como os tópicos têm seus esquemas codificados.

  • Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
  • Para tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna value.
  • Se o leitor estiver configurado para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma coluna value.

Os registros pulsar têm os seguintes campos de metadados:

Coluna

Tipo

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Configurar opções de leitura da transmissão da Pulsar

Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>"). Você também pode configurar a autenticação usando as opções. Consulte Autenticar no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar somente uma das opções topic, topics ou topicsPattern.

Opção

Valor padrão

Descrição

service.url

nenhum

A configuração da Pulsar serviceUrl para o serviço da Pulsar.

topic

nenhum

Uma cadeia de caracteres de nome de tópico para o tópico a ser consumido.

topics

nenhum

Uma lista separada por vírgulas dos tópicos a serem consumidos.

topicsPattern

nenhum

Um Java regex strings para combinar com os tópicos a serem consumidos.

A tabela a seguir descreve outras opções suportadas pelo Pulsar:

Opção

Valor padrão

Descrição

predefinedSubscription

nenhum

O nome de inscrição predefinido usado pelo conector para rastrear o progresso das aplicações Spark.

subscriptionPrefix

nenhum

Um prefixo usado pelo conector para gerar uma inscrição aleatória para rastrear o progresso das aplicações Spark.

pollTimeoutMs

120000

O tempo limite para ler mensagens do Pulsar em milissegundos.

waitingForNonExistedTopic

false

Se o conector deve esperar até que os tópicos desejados sejam criados.

failOnDataLoss

true

Controla se ocorre uma falha em uma consulta quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).

allowDifferentTopicSchemas

false

Se vários tópicos com esquemas diferentes forem lidos, use esse parâmetro para desativar a desserialização automática do valor do tópico com base no esquema. Somente os valores brutos são retornados quando isso é true.

startingOffsets

latest

Se latest, o leitor lê os registros mais recentes depois de começar a funcionar. Se earliest, o leitor lê a partir do primeiro offset. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.

maxBytesPerTrigger

nenhum

Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisará ser especificado.

admin.url

nenhum

A configuração Pulsar serviceHttpUrl. Só é necessário quando maxBytesPerTrigger é especificado.

Você também pode especificar qualquer configuração de cliente, administrador e leitor Pulsar usando os seguintes padrões:

Padrão

Link para as opções de configuração

pulsar.client.*

Configuração do cliente Pulsar

pulsar.admin.*

Configuração de administração do Pulsar

pulsar.reader.*

Configuração do leitor Pulsar

Construir offsets iniciais JSON

O senhor pode construir manualmente um ID de mensagem para especificar um deslocamento específico e passá-lo como um JSON para a opção startingOffsets. O exemplo de código a seguir demonstra essa sintaxe:

Scala
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()