Apache Pulsar からのストリーム

プレビュー

この機能は パブリックプレビュー版です。

Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Databricks 上の Apache Pulsar からデータをストリーミングできます。

構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して、正確に 1 回限りの処理セマンティクスを提供します。

構文の例

以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

トピックを指定するには、常に service.url と次のいずれかのオプションを指定する必要があります。

  • topic

  • topics

  • topicsPattern

オプションの完全なリストについては、「 Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。

Pulsarへの認証

Databricks は、Pulsar に対するトラストストアとキーストアの認証をサポートしています。 Databricks では、構成の詳細を格納するときにシークレットを使用することをお勧めします。

ストリーム構成時に次のオプションを設定できます。

  • pulsar.client.authPluginClassName

  • pulsar.client.authParams

  • pulsar.client.useKeyStoreTls

  • pulsar.client.tlsTrustStoreType

  • pulsar.client.tlsTrustStorePath

  • pulsar.client.tlsTrustStorePassword

ストリームで PulsarAdminを使用する場合は、次の設定も行います。

  • pulsar.admin.authPluginClassName

  • pulsar.admin.authParams

次に、認証オプションの設定例を示します。

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsarのスキーマ

Pulsarから読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。

  • Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。

  • Pulsarのスキーマのないトピックや単純なデータ型のトピックの場合、ペイロードは value 列にロードされます。

  • リーダーがスキーマの異なる複数のトピックを読み取るように構成されている場合は、生のコンテンツをvalue列に読み込むように allowDifferentTopicSchemas を設定します。

Pulsarレコードには、以下のメタデータフィールドがあります。

タイプ

__key

binary

__topic

string

__messageId

binary

__publishTime

timestamp

__eventTime

timestamp

__messageProperties

map<String, String>

Pulsar ストリーミング読み取りのオプションを構成する

すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として構成されます。 オプションを使用して認証を設定することもできます。 Pulsarへの認証を参照してください。

次の表に、Pulsar に必要な構成を示します。 オプション topictopics 、または topicsPatternのいずれか 1 つだけを指定する必要があります。

オプション

デフォルト値

説明

service.url

なし

Pulsar サービスの Pulsar serviceUrl 構成。

topic

なし

使用するトピックのトピック名文字列。

topics

なし

使用するトピックのコンマ区切りリスト。

topicsPattern

なし

使用するトピックで照合する Java 正規表現文字列。

次の表に、Pulsar でサポートされるその他のオプションを示します。

オプション

デフォルト値

説明

predefinedSubscription

なし

Sparkアプリケーションの進行状況を追跡するためにコネクタによって使用される定義済みのサブスクリプション名。

subscriptionPrefix

なし

Sparkアプリケーションの進行状況を追跡するためのランダムなサブスクリプションを生成するためにコネクタによって使用されるプレフィックス。

pollTimeoutMs

120000

Pulsar からメッセージを読み取るためのタイムアウト (ミリ秒単位)。

waitingForNonExistedTopic

false

目的のトピックが作成されるまでコネクタが待機するかどうか。

failOnDataLoss

true

データが失われたとき (たとえば、トピックが削除された場合や、保持ポリシーのためにメッセージが削除された場合など) にクエリーを失敗させるかどうかを制御します。

allowDifferentTopicSchemas

false

スキーマが異なる複数のトピックを読み取る場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが trueの場合、生の値のみが返されます。

startingOffsets

latest

latestの場合、リーダーは実行開始後に最新のレコードを読み取ります。earliestの場合、リーダーは最も古いオフセットから読み取ります。ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。

maxBytesPerTrigger

なし

マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、 admin.url も指定する必要があります。

admin.url

なし

Pulsar serviceHttpUrl 構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。

また、次のパターンを使用して、Pulsar クライアント、管理者、およびリーダーの構成を指定することもできます。

パターン

コニフィギュレーション オプションへのリンク

pulsar.client.*

Pulsarクライアントの構成

pulsar.admin.*

Pulsar 管理者設定

pulsar.reader.*

Pulsarリーダーの構成

開始オフセット JSON の構築

メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets オプションに渡すことができます。 次のコード例は、この構文を示しています。

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()