transmissão de Apache Pulsar
Visualização
Esse recurso está em Public Preview.
Em Databricks Runtime 14.1 e acima, o senhor pode usar a transmissão estruturada para transmitir dados do Apache Pulsar em Databricks.
A transmissão estruturada fornece uma semântica de processamento exatamente única para dados lidos de fontes Pulsar.
Exemplo de sintaxe
O exemplo a seguir é um exemplo básico de uso da transmissão estruturada para leitura da Pulsar:
- Python
- Scala
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Para ler tópicos do Pulsar, você deve fornecer um(a) service.url e uma das seguintes opções:
topictopicstopicsPattern
Para obter uma lista completa de opções, consulte Configurar opções para leitura da transmissão Pulsar.
Autentique-se no Pulsar
O Databricks suporta autenticação de truststore e keystore ao Pulsar. Databricks recomenda usar segredos para armazenar detalhes de configuração.
Para obter a lista completa de opções de autenticação, consulte Autenticação.
Exemplo
O exemplo a seguir demonstra a configuração das opções de autenticação:
- Python
- Scala
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Esquema pulsar
Quando se lê do Pulsar, o esquema das linhas depende dos esquemas dos tópicos da origem.
- Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
- Para tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna
value. - Se você configurar a transmissão para ler vários tópicos com esquemas diferentes, defina
allowDifferentTopicSchemaspara carregar o conteúdo bruto em uma colunavalue.
Os registros pulsar têm os seguintes campos de metadados:
Coluna | Tipo |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Configurar opções de leitura da transmissão da Pulsar
Para obter a lista completa de opções, consulte Pulsar.
Construir offsets iniciais JSON
Para usar um ID de mensagem personalizado que especifica um deslocamento, como JSON, com a opção startingOffsets, veja o exemplo a seguir:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()