メインコンテンツまでスキップ

Google Pub/Sub を定期購入する

Databricks には、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub をサブスクライブするための組み込みコネクタが用意されています。 このコネクタは、サブスクライバーからのレコードに対して exactly-once 処理セマンティクスを提供します。

注記

Pub/Sub が重複するレコードを公開したり、レコードが順不同でサブスクライバーに届いたりする可能性があります。 Databricksコードは、重複するレコードや順不同のレコードを処理するために記述する必要があります。

構文の例

次の構文例は、サービス認証情報を使用して Pub/Sub から読み取られる構造化ストリーミングの構成を示しています。 すべての認証オプションについては、 Pub/Sub へのアクセスを設定するをご覧ください。

Scala
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.option("serviceCredential", "service-credential-name") // required
.load()

その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。

Pub/Sub へのアクセスを設定する

次の表では、設定された資格情報に必要なロールについて説明します。

ロール

必須またはオプション

使用方法

roles/pubsub.viewer または roles/viewer

必須

サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する

roles/pubsub.subscriber

必須

サブスクリプションからデータをフェッチする

roles/pubsub.editor または roles/editor

オプション

サブスクリプションが存在しない場合はサブスクリプションの作成を有効にし、ストリームの終了時にサブスクリプションを削除する deleteSubscriptionOnStreamStop も有効にします

Databricks では、Pub/Sub 読み取りのサービス資格情報を構成することをお勧めします。 Pub/Sub のサービス資格情報には、Databricks Runtime 16.2 以降が必要です。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。

Databricks サービスの認証情報を利用できない場合は、Google サービス アカウント(GSA)を直接使用できます。

GSA を使用するようにコンピュートを構成すると、その GSA のアクセス許可は、そのクラスターで実行されているすべてのクエリで使用できます。 「Google サービス アカウント」を参照してください。

注記

標準アクセスモードで設定されたコンピュートにGSAをアタッチすることはできません。

次のオプションを構成して、GSA を直接ストリームに渡すことができます。

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Sub スキーマ

ストリームのスキーマは、次の表に示すように、Pub/Sub からフェッチされたレコードと一致します。

フィールド

タイプ

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Pub/Sub ストリーミング読み取りのオプションを構成する

次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として設定されます。

注記

一部の Pub/Sub 構成オプションでは、 マイクロバッチ ではなく フェッチ の概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミング コネクタの帰結と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。

オプション

デフォルト値

説明

numFetchPartitions

ストリームの初期化時に存在するエグゼキューターの数の半分に設定します。

サブスクリプションからレコードをフェッチする並列 Spark タスクの数。

deleteSubscriptionOnStreamStop

false

trueの場合、ストリームに渡されたサブスクリプションは、ストリーミング ジョブの終了時に削除されます。

maxBytesPerTrigger

なし

トリガーされた各マイクロバッチ中に処理されるバッチ サイズのソフト制限。

maxRecordsPerFetch

1000

レコードを処理する前にタスクごとにフェッチするレコードの数。

maxFetchPeriod

10秒

レコードを処理する前に各タスクがフェッチする時間。 Databricks では、デフォルトの値を使用することをお勧めします。

Pub/Sub のインクリメンタル バッチ処理セマンティクス

Trigger.AvailableNow を使用して、Pub/Sub ソースから使用可能なレコードを増分バッチで消費できます。

Databricks は、 Trigger.AvailableNow 設定で読み取りを開始したときにタイムスタンプを記録します。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプよりもタイムスタンプが小さい新しく公開されたレコードが含まれます。

増分バッチ処理の構成を参照してください。

モニタリング ストリーミング メトリクス

構造化ストリーミングの進行状況メトリクスは、フェッチされて処理の準備ができたレコードの数、フェッチされて処理の準備ができたレコードのサイズ、およびストリームの開始以降に確認された重複の数を報告します。 これらのメトリクスの例を次に示します。

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

制限

投機的実行(spark.speculation)は Pub/Sub ではサポートされていません。