Apache Pulsar からのストリーム
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime14.1 以降では、構造化ストリーミングを使用して、Apache Pulsar からデータをストリームできます。Databricks
構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して exactly-once 処理セマンティクスを提供します。
構文の例
以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。
- Python
- Scala
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Pulsarトピックから読み取るには、service.urlと以下のいずれかのオプションを提供する必要があります。
topictopicstopicsPattern
オプションの完全なリストについては、 Pulsar ストリーミング読み取りのオプションを構成するを参照してください。
Pulsarへの認証
Databricksは、Pulsarへのトラストストアおよびキーストア認証をサポートしています。Databricksでは、構成情報を保存するためにシークレットを使用することを推奨しています。
認証オプションの全リストについては、認証を参照してください。
例
次の例は、認証オプションの設定を示しています。
- Python
- Scala
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
パルサーのスキーマ
Pulsarから読み込む場合、行のスキーマはソースのトピックのスキーマに依存します。
- Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド タイプは結果の Spark データフレーム に保持されます。
- スキーマのないトピックや Pulsar の単純なデータ型のトピックの場合、ペイロードは
value列に読み込まれます。 - ストリームを異なるスキーマの複数のトピックを読み取るように構成する場合、
allowDifferentTopicSchemasを設定して生コンテンツをvalue列に読み込みます。
Pulsarレコードには、次のメタデータフィールドがあります。
列 | タイプ |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Pulsar ストリーミング読み取りのオプションを構成する
オプションの完全なリストについては、「Pulsar」を参照してください。
開始オフセット JSON の構築
オフセットを指定するカスタムメッセージ ID を JSON として、startingOffsets オプションと共に使用するには、次の例をご覧ください。
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()