Google Pub/Sub をサブスクライブする

Databricks は、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub にサブスクライブするための組み込みコネクタを提供します。 このコネクタは、サブスクライバーからのレコードに対して exactly-once 処理セマンティクスを提供します。

注:

Pub/Sub が重複レコードをパブリッシュし、レコードが順不同でサブスクライバーに到着することがあります。 重複レコードや順序が正しくないレコードを処理する Databricks コードを記述する必要があります。

構文の例

次のコード例は、Pub/Sub から読み取られた構造化ストリーミングを構成するための基本的な構文を示しています。

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

その他の構成オプションについては、 Pub/Sub ストリーミング読み取りのオプションを構成するをご覧ください。

Pub/Sub へのアクセスを構成する

Databricks では、承認オプションを指定するときにシークレットを使用することをお勧めします。 接続を承認するには、次のオプションが必要です。

  • clientEmail

  • clientId

  • privateKey

  • privateKeyId

次の表に、設定された資格情報に必要なロールを示します。

ロール

必須またはオプション

使用方法

roles/pubsub.viewer または roles/viewer

*必須

サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する

roles/pubsub.subscriber

*必須

サブスクリプションからデータを取得する

roles/pubsub.editor または roles/editor

オプション

サブスクリプションが存在しない場合は作成可能にし、ストリーム終了時にサブスクリプションを削除する deleteSubscriptionOnStreamStop も使用可能にします

Pub/Sub スキーマ

ストリームのスキーマは、次の表に示すように、Pub/Sub からフェッチされたレコードと一致します。

フィールド

タイプ

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Pub/Sub ストリーミング読み取りのオプションを構成する

次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として構成されます。

注:

一部の Pub/Sub 構成オプションでは、マイクロバッチの代わりにフェッチの概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミングコネクタの必然的な結果と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。

オプション

デフォルト値

説明

numFetchPartitions

ストリーム初期化時のエグゼキューター数に設定

サブスクリプションからレコードをフェッチする並列 Spark タスクの数。

deleteSubscriptionOnStreamStop

false

trueの場合、ストリームに渡されたサブスクリプションは、ストリーミングジョブの終了時に削除されます。

maxBytesPerTrigger

なし

トリガーされた各マイクロバッチ中に処理されるバッチサイズのソフト制限。

maxRecordsPerFetch

1000

レコードを処理する前にタスクごとにフェッチするレコードの数。

maxFetchPeriod

10秒

レコードを処理する前に各タスクがフェッチする期間。 Databricks では、既定値の使用が推奨されています。

Pub/Sub の増分バッチ処理セマンティクス

Trigger.AvailableNow を使用して、Pub/Sub ソースから使用可能なレコードを増分バッチで消費できます。

Databricks では、 Trigger.AvailableNow 設定で読み取りを開始すると、タイムスタンプが記録されます。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプよりもタイムスタンプが小さい新しく公開されたレコードが含まれます。

増分バッチ処理の構成を参照してください。

モニタリング streaming メトリクス

構造化ストリーミングの進行状況メトリクスは、フェッチされて処理の準備ができているレコードの数、フェッチされて処理の準備ができているレコードのサイズ、およびストリームの開始以降に確認された重複の数を報告します。 以下は、これらのメトリクスの例です。

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

制限事項

投機的実行(spark.speculation)は Pub/Sub ではサポートされていません。