メインコンテンツまでスキップ

Apache Pulsar からのストリーム

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks Runtime14.1 以降では、構造化ストリーミングを使用して、Apache Pulsar からデータをストリームできます。Databricks

構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して exactly-once 処理セマンティクスを提供します。

構文の例

以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。

Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)

Pulsarトピックから読み取るには、service.urlと以下のいずれかのオプションを提供する必要があります。

  • topic
  • topics
  • topicsPattern

オプションの完全なリストについては、 Pulsar ストリーミング読み取りのオプションを構成するを参照してください。

Pulsarへの認証

Databricksは、Pulsarへのトラストストアおよびキーストア認証をサポートしています。Databricksでは、構成情報を保存するためにシークレットを使用することを推奨しています。

認証オプションの全リストについては、認証を参照してください。

次の例は、認証オプションの設定を示しています。

Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)

パルサーのスキーマ

Pulsarから読み込む場合、行のスキーマはソースのトピックのスキーマに依存します。

  • Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド タイプは結果の Spark データフレーム に保持されます。
  • スキーマのないトピックや Pulsar の単純なデータ型のトピックの場合、ペイロードは value 列に読み込まれます。
  • ストリームを異なるスキーマの複数のトピックを読み取るように構成する場合、allowDifferentTopicSchemas を設定して生コンテンツを value 列に読み込みます。

Pulsarレコードには、次のメタデータフィールドがあります。

タイプ

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Pulsar ストリーミング読み取りのオプションを構成する

オプションの完全なリストについては、「Pulsar」を参照してください。

開始オフセット JSON の構築

オフセットを指定するカスタムメッセージ ID を JSON として、startingOffsets オプションと共に使用するには、次の例をご覧ください。

Scala
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()