Google Pub/Sub を定期購入する
組み込みのコネクタを使用して、Google Pub/Sub を購読してください。このコネクタは、サブスクライバーからのレコードに対して、厳密に1回限りの処理セマンティクスを提供します。
パブリッシュ/サブスクライブでは、重複したレコードが公開されたり、レコードが購読者に順不同で届いたりする可能性があります。重複レコードや順不同レコードを処理するコードを記述してください。
Pub/Sub ストリームの設定
以下の例は、サービス認証情報を使用して Pub/Sub から構造化ストリーミング読み取りを構成する方法を示しています。すべての認証オプションについては、 「Pub/Subへのアクセスを構成する」を参照してください。
- Python
- SQL
- Scala
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.option("serviceCredential", "service-credential-name")
.load()
)
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
serviceCredential => 'service-credential-name'
);
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.option("serviceCredential", "service-credential-name")
.load()
その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。
Pub/Sub へのアクセスを設定する
設定する認証情報には、以下の役割が付与されている必要があります。
ロール | 必須またはオプション | 役割の使い方 |
|---|---|---|
| 必須 | サブスクリプションが存在するかどうかを確認し、存在する場合はサブスクリプションを取得します。 |
| 必須 | サブスクリプションからデータを取得します。 |
| オプション | サブスクリプションが存在しない場合は作成を可能にし、ストリーム終了時に |
Databricksは、Pub/Sub読み取り用にサービス認証情報を設定することを推奨しています。Pub/Sub のサービス認証情報には、Databricks Runtime 16.1 以降が必要です。サービス資格情報の作成を参照してください。ただし、 Databricksサービスの資格情報が利用できない場合は、Google サービス アカウント (GSA) を直接使用できます。 GSA を使用するようにコンピュートを構成すると、そのクラスター上で実行されているすべてのクエリに対して GSA のアクセス許可が使用可能になります。 「Google サービス アカウント」を参照してください。
標準アクセスモードで設定されたコンピュートにGSAをアタッチすることはできません。
GSAをストリームに直接渡すには、以下のオプションを設定してください。
clientEmailclientIdprivateKeyprivateKeyId
Pub/Subスキーマを理解する
ストリームのスキーマは、次の表に示すように、Pub/Sub から取得されるレコードと一致します。
フィールド | タイプ |
|---|---|
|
|
|
|
|
|
|
|
Pub/Sub ストリーミング読み取りのオプションを構成する
次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>") 構文を使用した構造化ストリーミング読み取りの一部として設定されます。
一部の Pub/Sub 構成オプションでは、 マイクロバッチ ではなく フェッチ の概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミング コネクタの帰結と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。
オプション | デフォルト値 | 説明 |
|---|---|---|
| ストリームの初期化時に存在するエグゼキューターの数の半分に設定します。 | サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
|
|
|
|
| トリガーされた各マイクロバッチ中に処理されるバッチ サイズのソフト制限。 |
|
| レコードを処理する前にタスクごとにフェッチするレコードの数。 |
|
| 各タスクがレコードを処理する前にデータを取得するのにかかる時間。期間を表す文字列を受け付けます。例えば、 |
Pub/Sub で増分バッチ処理を使用する
Trigger.AvailableNowを使用すると、Pub/Sub ソースから利用可能なレコードを増分バッチとして消費できます。
Databricksは、 Trigger.AvailableNow設定で読み取りを開始した時点のタイムスタンプを記録します。バッチ処理されるレコードには、以前に取得されたすべてのデータと、記録されたストリーム開始タイムスタンプよりも前のタイムスタンプを持つ新規公開レコードが含まれます。詳細については、 AvailableNow : 増分バッチ処理」を参照してください。
Pub/Sub ストリーミング メトリクスを監視する
構造化ストリーミングの進行状況メトリクスは、フェッチされて処理の準備ができたレコードの数、フェッチされて処理の準備ができたレコードのサイズ、およびストリームの開始以降に確認された重複の数を報告します。 これらのメトリクスの例を次に示します。
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
制限
Pub/Sub は投機的実行をサポートしていません ( spark.speculation )。