transmissão do Apache Pulsar
Visualização
Esse recurso está em visualização pública.
No Databricks Runtime 14.1 e acima, você pode usar a transmissão estruturada para transmitir dados do Apache Pulsar no Databricks.
A transmissão estructurada fornece semântica de processamento exatamente uma vez para dados lidos de fontes Pulsar.
Exemplo de sintaxe
A seguir está um exemplo básico de uso de transmissão estruturada para leitura do Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Você deve sempre fornecer um service.url
e uma das seguintes opções para especificar tópicos:
topic
topics
topicsPattern
Para obter uma lista completa de opções, consulte Configurar opções para leitura de transmissão do Pulsar.
Autenticar no Pulsar
Databricks oferece suporte à autenticação de armazenamento confiável e keystore para Pulsar. A Databricks recomenda a utilização de segredos ao armazenar detalhes de configuração.
Você pode definir as seguintes opções durante a configuração da transmissão:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Se a transmissão usar um PulsarAdmin
, defina também o seguinte:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
O exemplo a seguir demonstra a configuração de opções de autenticação:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Esquema pulsar
O esquema de registros lidos no Pulsar depende de como os tópicos têm seus esquemas codificados.
Para tópicos com esquema Avro ou JSON, os nomes e tipos de campo são preservados no Spark DataFrame resultante.
Para tópicos sem esquema ou com tipo de dados simples no Pulsar, o payload é carregado em uma coluna
value
.Se o leitor estiver configurado para ler vários tópicos com esquemas diferentes, defina
allowDifferentTopicSchemas
para carregar o conteúdo bruto em uma colunavalue
.
Os registros Pulsar possuem os seguintes campos de metadados:
Coluna |
Tipo |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Configurar opções para leitura de transmissão de Pulsar
Todas as opções são configuradas como parte de uma leitura de transmissão estruturada usando a sintaxe .option("<optionName>", "<optionValue>")
. Você também pode configurar a autenticação usando opções. Consulte Autenticar no Pulsar.
A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topic
, topics
ou topicsPattern
.
Opção |
Valor padrão |
Descrição |
---|---|---|
|
nenhum |
A configuração Pulsar |
|
nenhum |
Uma strings de nome de tópico para o tópico consumir. |
|
nenhum |
Uma lista separada por vírgulas dos tópicos a serem consumidos. |
|
nenhum |
Uma strings regex Java para corresponder aos tópicos a serem consumidos. |
A tabela a seguir descreve outras opções suportadas pelo Pulsar:
Opção |
Valor padrão |
Descrição |
---|---|---|
|
nenhum |
O nome de inscrição predefinido usado pelo conector para rastrear o progresso das aplicações Spark. |
|
nenhum |
Um prefixo usado pelo conector para gerar uma inscrição aleatória para acompanhar o progresso das aplicações Spark. |
|
120.000 |
O tempo limite para leitura de mensagens do Pulsar em milissegundos. |
|
|
Se o conector deve aguardar até que os tópicos desejados sejam criados. |
|
|
Controla se uma query deve falhar quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção). |
|
|
Se vários tópicos com esquemas diferentes forem lidos, use esse parâmetro para desativar a desserialização automática do valor do tópico baseada em esquema. Somente os valores brutos são retornados quando for |
|
|
Se |
|
nenhum |
Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, |
|
nenhum |
A configuração do Pulsar |
Você também pode especificar qualquer configuração de cliente, administrador e leitor Pulsar usando os seguintes padrões:
Padrão |
Link para opções de configuração |
---|---|
|
|
|
|
|
Construir deslocamentos iniciais JSON
Você pode construir manualmente um ID de mensagem para especificar um deslocamento específico e transmiti-lo como JSON para a opção startingOffsets
. O exemplo de código a seguir demonstra essa sintaxe:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()