Pular para o conteúdo principal

transmissão de Apache Pulsar

info

Visualização

Esse recurso está em Public Preview.

Em Databricks Runtime 14.1 e acima, o senhor pode usar a transmissão estruturada para transmitir dados do Apache Pulsar em Databricks.

A transmissão estruturada fornece uma semântica de processamento exatamente única para dados lidos de fontes Pulsar.

Exemplo de sintaxe

O exemplo a seguir é um exemplo básico de uso da transmissão estruturada para leitura da Pulsar:

Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)

Para ler tópicos do Pulsar, você deve fornecer um(a) service.url e uma das seguintes opções:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura da transmissão Pulsar.

Autentique-se no Pulsar

O Databricks suporta autenticação de truststore e keystore ao Pulsar. Databricks recomenda usar segredos para armazenar detalhes de configuração.

Para obter a lista completa de opções de autenticação, consulte Autenticação.

Exemplo

O exemplo a seguir demonstra a configuração das opções de autenticação:

Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)

Esquema pulsar

Quando se lê do Pulsar, o esquema das linhas depende dos esquemas dos tópicos da origem.

  • Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
  • Para tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna value.
  • Se você configurar a transmissão para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma coluna value.

Os registros pulsar têm os seguintes campos de metadados:

Coluna

Tipo

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Configurar opções de leitura da transmissão da Pulsar

Para obter a lista completa de opções, consulte Pulsar.

Construir offsets iniciais JSON

Para usar um ID de mensagem personalizado que especifica um deslocamento, como JSON, com a opção startingOffsets, veja o exemplo a seguir:

Scala
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()