メインコンテンツまでスキップ

Google Pub/Sub を定期購入する

組み込みのコネクタを使用して、Google Pub/Sub を購読してください。このコネクタは、サブスクライバーからのレコードに対して、厳密に1回限りの処理セマンティクスを提供します。

注記

パブリッシュ/サブスクライブでは、重複したレコードが公開されたり、レコードが購読者に順不同で届いたりする可能性があります。重複レコードや順不同レコードを処理するコードを記述してください。

Pub/Sub ストリームの設定

以下のコード例は、Pub/Sub からの構造化ストリーミング読み取りを構成するための基本的な構文を示しています。

Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}

query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)

その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。

Pub/Sub へのアクセスを設定する

設定する認証情報には、以下の役割が付与されている必要があります。

ロール

必須またはオプション

役割の使い方

roles/pubsub.viewer または roles/viewer

必須

サブスクリプションが存在するかどうかを確認し、存在する場合はサブスクリプションを取得します。

roles/pubsub.subscriber

必須

サブスクリプションからデータを取得します。

roles/pubsub.editor または roles/editor

オプション

サブスクリプションが存在しない場合は作成を可能にし、ストリーム終了時にdeleteSubscriptionOnStreamStopを使用してサブスクリプションを削除できるようにします。

Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 接続を認証するには、次のオプションが必要です。

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Subスキーマを理解する

ストリームのスキーマは、次の表に示すように、Pub/Sub から取得されるレコードと一致します。

フィールド

タイプ

messageId

StringType

payload

ArrayType[ByteType]

attributes

StringType

publishTimestampInMillis

LongType

Pub/Sub ストリーミング読み取りのオプションを構成する

次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として設定されます。

注記

一部の Pub/Sub 構成オプションでは、 マイクロバッチ ではなく フェッチ の概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミング コネクタの帰結と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。

オプション

デフォルト値

説明

numFetchPartitions

ストリームの初期化時に存在するエグゼキューターの数の半分に設定します。

サブスクリプションからレコードをフェッチする並列 Spark タスクの数。

deleteSubscriptionOnStreamStop

false

trueの場合、ストリームに渡されたサブスクリプションは、ストリーミング ジョブの終了時に削除されます。

maxBytesPerTrigger

none

トリガーされた各マイクロバッチ中に処理されるバッチ サイズのソフト制限。

maxRecordsPerFetch

1000

レコードを処理する前にタスクごとにフェッチするレコードの数。

maxFetchPeriod

10s

各タスクがレコードを処理する前にデータを取得するのにかかる時間。期間を表す文字列を受け付けます。例えば、 1sは1秒、 1mは1分を表します。Databricksはデフォルト値の使用を推奨しています。

Pub/Sub で増分バッチ処理を使用する

Trigger.AvailableNowを使用すると、Pub/Sub ソースから利用可能なレコードを増分バッチとして消費できます。

Databricksは、 Trigger.AvailableNow設定で読み取りを開始した時点のタイムスタンプを記録します。バッチ処理されるレコードには、以前に取得されたすべてのデータと、記録されたストリーム開始タイムスタンプよりも前のタイムスタンプを持つ新規公開レコードが含まれます。詳細については、 AvailableNow : 増分バッチ処理」を参照してください。

Pub/Sub ストリーミング メトリクスを監視する

構造化ストリーミングの進行状況メトリクスは、フェッチされて処理の準備ができたレコードの数、フェッチされて処理の準備ができたレコードのサイズ、およびストリームの開始以降に確認された重複の数を報告します。 これらのメトリクスの例を次に示します。

JSON
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}

制限

Pub/Sub は投機的実行をサポートしていません ( spark.speculation )。