Google Pub/Sub を定期購入する
組み込みコネクタを使用して Google Pub/Sub をサブスクライブしてください。このコネクタは、サブスクライバーからの行に対して、厳密に 1 回処理セマンティクスを持っています。
Pub/Sub は重複する行を発行するか、または、行がサブスクライバーに順不同で届くことがあります。重複する行と順不同の行を処理するためのコードを記述する必要があります。
Pub/Sub ストリームの設定
以下の例は、Pub/Sub から読み取り、サービス認証情報を使用して認証する方法を示しています。すべての認証オプションについては、Pub/Subへのアクセスを構成するを参照してください。
- Python
- Scala
- SQL
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.option("serviceCredential", "service-credential-name")
.load()
)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.option("serviceCredential", "service-credential-name")
.load()
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
serviceCredential => 'service-credential-name'
);
その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。
Pub/Sub へのアクセスを設定する
資格情報には、以下のロールが必要です。
ロール | 必須またはオプション | 役割の使い方 |
|---|---|---|
| 必須 | サブスクリプションが存在するかどうかを確認し、存在する場合はサブスクリプションを取得します。 |
| 必須 | サブスクリプションからデータを取得します。 |
| オプション | サブスクリプションが存在しない場合は作成を可能にし、ストリーム終了時に |
Databricksは、Pub/Subの読み取り用にサービス資格情報の設定を推奨します。Pub/Sub のサービス資格情報には、Databricks Runtime 16.1 以降が必要です。サービス資格情報の作成を参照してください。ただし、Databricks サービスの認証情報が利用できない場合、Google サービス アカウント(GSA)を直接使用できます。GSA のアクセス許可は、そのクラスターで実行されているすべてのクエリで使用できます。「Google サービス アカウント」を参照してください。
標準アクセスモードで設定されたコンピュートにGSAをアタッチすることはできません。
ストリームで GSA を使用するための次のオプションを構成する
clientEmailclientIdprivateKeyprivateKeyId
Pub/Subスキーマを理解する
ストリームのスキーマは、Pub/Subから取得される行と一致します。詳細は次の表で説明されています:
フィールド | タイプ |
|---|---|
|
|
|
|
|
|
|
|
Pub/Sub ストリーミング読み取りのオプションを構成する
一部のPub/Sub構成オプションは、 マイクロバッチ ではなく、 フェッチ の概念を使用しています。これは内部実装の詳細であり、オプションは、行がフェッチされてから処理される点を除き、他の構造化ストリーミングコネクターと同様に機能します。
オプションの全リストについては、パブ/サブをご覧ください。
Pub/Sub で増分バッチ処理を使用する
Trigger.AvailableNow を使用して、Pub/Subソースから利用可能な行を増分バッチとして消費できます。
Databricks は、Trigger.AvailableNow の設定で読み取りを開始したときに、タイムスタンプを記録します。バッチによって処理される行には、以前にフェッチされたすべてのデータと、記録された開始タイムスタンプよりも小さいタイムスタンプを持つ新しく公開された行が含まれます。詳細については、「AvailableNow: 増分バッチ処理」を参照してください。
Pub/Sub ストリーミング メトリクスを監視する
構造化ストリーミングの進捗メトリクスは、フェッチされ処理準備が整った行数、フェッチされ処理準備が整った行のサイズ、およびストリーム開始以降に検出された重複数を報告します。
以下は Pub/Sub メトリクス:の例です。
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
制限
パブ/サブは spark.speculation との投機的実行をサポートしていません。