Google Pub/Sub を定期購入する
Databricks には、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub をサブスクライブするための組み込みコネクタが用意されています。 このコネクタは、サブスクライバーからのレコードに対して exactly-once 処理セマンティクスを提供します。
Pub/Sub が重複するレコードを公開したり、レコードが順不同でサブスクライバーに届いたりする可能性があります。 Databricksコードは、重複するレコードや順不同のレコードを処理するために記述する必要があります。
構文の例
次の構文例は、サービス認証情報を使用して Pub/Sub から読み取られる構造化ストリーミングの構成を示しています。 すべての認証オプションについては、 Pub/Sub へのアクセスを設定するをご覧ください。
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.option("serviceCredential", "service-credential-name") // required
.load()
その他の設定オプションについては、 Pub/Sub ストリーミング読み取りのオプションを設定するをご覧ください。
Pub/Sub へのアクセスを設定する
次の表では、設定された資格情報に必要なロールについて説明します。
ロール | 必須またはオプション | 使用方法 |
---|---|---|
| 必須 | サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する |
| 必須 | サブスクリプションからデータをフェッチする |
| オプション | サブスクリプションが存在しない場合はサブスクリプションの作成を有効にし、ストリームの終了時にサブスクリプションを削除する |
Databricks では、Pub/Sub 読み取りのサービス資格情報を構成することをお勧めします。 Pub/Sub のサービス資格情報には、Databricks Runtime 16.2 以降が必要です。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。
Databricks サービスの認証情報を利用できない場合は、Google サービス アカウント(GSA)を直接使用できます。
GSA を使用するようにコンピュートを構成すると、その GSA のアクセス許可は、そのクラスターで実行されているすべてのクエリで使用できます。 「Google サービス アカウント」を参照してください。
標準アクセスモードで設定されたコンピュートにGSAをアタッチすることはできません。
次のオプションを構成して、GSA を直接ストリームに渡すことができます。
clientEmail
clientId
privateKey
privateKeyId
Pub/Sub スキーマ
ストリームのスキーマは、次の表に示すように、Pub/Sub からフェッチされたレコードと一致します。
フィールド | タイプ |
---|---|
|
|
|
|
|
|
|
|
Pub/Sub ストリーミング読み取りのオプションを構成する
次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>")
構文を使用した構造化ストリーミング読み取りの一部として設定されます。
一部の Pub/Sub 構成オプションでは、 マイクロバッチ ではなく フェッチ の概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミング コネクタの帰結と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。
オプション | デフォルト値 | 説明 |
---|---|---|
| ストリームの初期化時に存在するエグゼキューターの数の半分に設定します。 | サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
|
|
|
| なし | トリガーされた各マイクロバッチ中に処理されるバッチ サイズのソフト制限。 |
| 1000 | レコードを処理する前にタスクごとにフェッチするレコードの数。 |
| 10秒 | レコードを処理する前に各タスクがフェッチする時間。 Databricks では、デフォルトの値を使用することをお勧めします。 |
Pub/Sub のインクリメンタル バッチ処理セマンティクス
Trigger.AvailableNow
を使用して、Pub/Sub ソースから使用可能なレコードを増分バッチで消費できます。
Databricks は、 Trigger.AvailableNow
設定で読み取りを開始したときにタイムスタンプを記録します。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプよりもタイムスタンプが小さい新しく公開されたレコードが含まれます。
増分バッチ処理の構成を参照してください。
モニタリング ストリーミング メトリクス
構造化ストリーミングの進行状況メトリクスは、フェッチされて処理の準備ができたレコードの数、フェッチされて処理の準備ができたレコードのサイズ、およびストリームの開始以降に確認された重複の数を報告します。 これらのメトリクスの例を次に示します。
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
制限
投機的実行(spark.speculation
)は Pub/Sub ではサポートされていません。