メインコンテンツまでスキップ

Apache Pulsar からのストリーム

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks Runtime14.1 以降では、構造化ストリーミングを使用して、Apache Pulsar からデータをストリームできます。Databricks

構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して exactly-once 処理セマンティクスを提供します。

構文の例

以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。

Scala
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()

トピックを指定するには、常に service.url と次のいずれかのオプションを指定する必要があります。

  • topic
  • topics
  • topicsPattern

オプションの完全なリストについては、 Pulsar ストリーミング読み取りのオプションを構成するを参照してください。

Pulsarへの認証

Databricks は、Pulsar に対するトラストストアとキーストアの認証をサポートしています。 Databricks では、構成の詳細を格納するときにシークレットを使用することをお勧めします。

ストリーム設定時に次のオプションを設定できます。

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

ストリームで PulsarAdminを使用する場合は、次のようにも設定します。

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

次の例は、認証オプションの設定を示しています。

Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()

パルサーのスキーマ

Pulsarから読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。

  • Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド タイプは結果の Spark データフレーム に保持されます。
  • スキーマのないトピックや Pulsar の単純なデータ型のトピックの場合、ペイロードは value 列に読み込まれます。
  • リーダーが異なるスキーマを持つ複数のトピックを読み取るように構成されている場合は、未加工のコンテンツをvalue列に読み込むように allowDifferentTopicSchemas を設定します。

Pulsarレコードには、次のメタデータフィールドがあります。

タイプ

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Pulsar ストリーミング読み取りのオプションを構成する

すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として設定されます。 オプションを使用して認証を構成することもできます。 「Pulsar への認証」を参照してください。

次の表では、Pulsar に必要な構成について説明します。 指定できるオプションは、 、 topicstopicsPatternのいずれかtopicつだけです。

オプション

デフォルト値

説明

service.url

なし

PulsarサービスのPulsar serviceUrl 構成。

topic

なし

使用するトピックのトピック名文字列。

topics

なし

使用するトピックのコンマ区切りリスト。

topicsPattern

なし

消費するトピックで照合する Java 正規表現文字列。

次の表では、Pulsar でサポートされているその他のオプションについて説明します。

オプション

デフォルト値

説明

predefinedSubscription

なし

コネクタが Sparkアプリケーションの進行状況を追跡するために使用する定義済みのサブスクリプション名。

subscriptionPrefix

なし

コネクタが Sparkアプリケーションの進行状況を追跡するためのランダムなサブスクリプションを生成するために使用するプレフィックス。

pollTimeoutMs

120000

Pulsar からのメッセージの読み取りのタイムアウト (ミリ秒単位)。

waitingForNonExistedTopic

false

コネクタが目的のトピックが作成されるまで待機するかどうか。

failOnDataLoss

true

データが失われた場合 (たとえば、トピックが削除された場合や、アイテム保持ポリシーによってメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。

allowDifferentTopicSchemas

false

スキーマの異なる複数のトピックが読み取られる場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが trueの場合、生の値のみが返されます。

startingOffsets

latest

latestの場合、リーダーは実行開始後に最新のレコードを読み取ります。earliestの場合、リーダーは最も古いオフセットから読み取ります。ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。

maxBytesPerTrigger

なし

マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、 admin.url も指定する必要があります。

admin.url

なし

Pulsar serviceHttpUrl 構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。

また、次のパターンを使用して、Pulsarクライアント、管理者、およびリーダーの構成を指定することもできます。

パターン

conifigurationのオプションへのリンク

pulsar.client.*

Pulsarクライアント構成

pulsar.admin.*

Pulsar管理構成

pulsar.reader.*

Pulsarリーダーの構成

開始オフセット JSON の構築

メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets オプションに渡すことができます。 次のコード例は、この構文を示しています。

Scala
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()