Apache Pulsar からのストリーム
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime14.1 以降では、構造化ストリーミングを使用して、Apache Pulsar からデータをストリームできます。Databricks
構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して exactly-once 処理セマンティクスを提供します。
構文の例
以下は、構造化ストリーミングを使用して Pulsar から読み取る基本的な例です。
query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
トピックを指定するには、常に service.url と次のいずれかのオプションを指定する必要があります。
topictopicstopicsPattern
オプションの完全なリストについては、 Pulsar ストリーミング読み取りのオプションを構成するを参照してください。
Pulsarへの認証
Databricks は、Pulsar に対するトラストストアとキーストアの認証をサポートしています。 Databricks では、構成の詳細を格納するときにシークレットを使用することをお勧めします。
ストリーム設定時に次のオプションを設定できます。
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
ストリームで PulsarAdminを使用する場合は、次のようにも設定します。
pulsar.admin.authPluginClassNamepulsar.admin.authParams
次の例は、認証オプションの設定を示しています。
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()
パルサーのスキーマ
Pulsarから読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。
- Avro または JSON スキーマを持つトピックの場合、フィールド名とフィールド タイプは結果の Spark データフレーム に保持されます。
 - スキーマのないトピックや Pulsar の単純なデータ型のトピックの場合、ペイロードは 
value列に読み込まれます。 - リーダーが異なるスキーマを持つ複数のトピックを読み取るように構成されている場合は、未加工のコンテンツを
value列に読み込むようにallowDifferentTopicSchemasを設定します。 
Pulsarレコードには、次のメタデータフィールドがあります。
列  | タイプ  | 
|---|---|
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
  | 
Pulsar ストリーミング読み取りのオプションを構成する
すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として設定されます。 オプションを使用して認証を構成することもできます。 「Pulsar への認証」を参照してください。
次の表では、Pulsar に必要な構成について説明します。 指定できるオプションは、 、 topics 、 topicsPatternのいずれかtopicつだけです。
オプション  | デフォルト値  | 説明  | 
|---|---|---|
  | なし  | PulsarサービスのPulsar   | 
  | なし  | 使用するトピックのトピック名文字列。  | 
  | なし  | 使用するトピックのコンマ区切りリスト。  | 
  | なし  | 消費するトピックで照合する Java 正規表現文字列。  | 
次の表では、Pulsar でサポートされているその他のオプションについて説明します。
オプション  | デフォルト値  | 説明  | 
|---|---|---|
  | なし  | コネクタが Sparkアプリケーションの進行状況を追跡するために使用する定義済みのサブスクリプション名。  | 
  | なし  | コネクタが Sparkアプリケーションの進行状況を追跡するためのランダムなサブスクリプションを生成するために使用するプレフィックス。  | 
  | 120000  | Pulsar からのメッセージの読み取りのタイムアウト (ミリ秒単位)。  | 
  | 
  | コネクタが目的のトピックが作成されるまで待機するかどうか。  | 
  | 
  | データが失われた場合 (たとえば、トピックが削除された場合や、アイテム保持ポリシーによってメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。  | 
  | 
  | スキーマの異なる複数のトピックが読み取られる場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが   | 
  | 
  | 
  | 
  | なし  | マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、   | 
  | なし  | Pulsar   | 
また、次のパターンを使用して、Pulsarクライアント、管理者、およびリーダーの構成を指定することもできます。
パターン  | conifigurationのオプションへのリンク  | 
|---|---|
  | |
  | |
  | 
開始オフセット JSON の構築
メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets オプションに渡すことができます。 次のコード例は、この構文を示しています。
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()