メインコンテンツまでスキップ

Google Pub/Sub を定期購入する

組み込みコネクタを使用して Google Pub/Sub をサブスクライブしてください。このコネクタは、サブスクライバーからの行に対して、厳密に 1 回処理セマンティクスを持っています。

注記

Pub/Sub は重複する行を発行するか、または、行がサブスクライバーに順不同で届くことがあります。重複する行と順不同の行を処理するためのコードを記述する必要があります。

Pub/Sub ストリームの設定

次のコード例では、Pub/Subから構造化ストリーミングの読み取りを構成し、秘密鍵で認証する方法を示しています。

Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}

query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)

その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。

Pub/Sub へのアクセスを設定する

資格情報には、以下のロールが必要です。

ロール

必須またはオプション

役割の使い方

roles/pubsub.viewer または roles/viewer

必須

サブスクリプションが存在するかどうかを確認し、存在する場合はサブスクリプションを取得します。

roles/pubsub.subscriber

必須

サブスクリプションからデータを取得します。

roles/pubsub.editor または roles/editor

オプション

サブスクリプションが存在しない場合は作成を可能にし、ストリーム終了時にdeleteSubscriptionOnStreamStopを使用してサブスクリプションを削除できるようにします。

Databricksでは、キーを使用する際にシークレットを使用することを推奨しています。接続を承認するには、次のオプションが必要です。

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Subスキーマを理解する

ストリームのスキーマは、Pub/Subから取得される行と一致します。詳細は次の表で説明されています:

フィールド

タイプ

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Pub/Sub ストリーミング読み取りのオプションを構成する

一部のPub/Sub構成オプションは、 マイクロバッチ ではなく、 フェッチ の概念を使用しています。これは内部実装の詳細であり、オプションは、行がフェッチされてから処理される点を除き、他の構造化ストリーミングコネクターと同様に機能します。

オプションの全リストについては、パブ/サブをご覧ください。

Pub/Sub で増分バッチ処理を使用する

Trigger.AvailableNow を使用して、Pub/Subソースから利用可能な行を増分バッチとして消費できます。

Databricks は、Trigger.AvailableNow の設定で読み取りを開始したときに、タイムスタンプを記録します。バッチによって処理される行には、以前にフェッチされたすべてのデータと、記録された開始タイムスタンプよりも小さいタイムスタンプを持つ新しく公開された行が含まれます。詳細については、「AvailableNow: 増分バッチ処理」を参照してください。

Pub/Sub ストリーミング メトリクスを監視する

構造化ストリーミングの進捗メトリクスは、フェッチされ処理準備が整った行数、フェッチされ処理準備が整った行のサイズ、およびストリーム開始以降に検出された重複数を報告します。

以下は Pub/Sub メトリクス:の例です。

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

制限

パブ/サブは spark.speculation との投機的実行をサポートしていません。