Google Pub/Subをサブスクライブする
Databricks は、Databricks Runtime 13.3 LTS 以降で Google Pub/Sub にサブスクライブするための組み込みコネクタを提供します。 このコネクタは、サブスクライバーからのレコードに対して exactly-once 処理セマンティクスを提供します。
注:
Pub/Sub が重複レコードをパブリッシュし、レコードが順不同でサブスクライバーに到着することがあります。 重複レコードや順序が正しくないレコードを処理する Databricks コードを記述する必要があります。
構文の例
次のコード例は、Pub/Sub から読み取られた構造化ストリーミングを構成するための基本的な構文を示しています。
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
その他の構成オプションについては、 Pub/Sub ストリーミング読み取りのオプションを構成するをご覧ください。
Pub/Subへのアクセスを構成する
Databricks では、承認オプションを指定するときにシークレットを使用することをお勧めします。 接続を承認するには、次のオプションが必要です。
clientEmail
clientId
privateKey
privateKeyId
次の表に、設定された資格情報に必要なロールを示します。
ロール |
必須またはオプション |
使用方法 |
---|---|---|
|
*必須 |
サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得する |
|
*必須 |
サブスクリプションからデータを取得する |
|
オプション |
サブスクリプションが存在しない場合は作成可能にし、ストリーム終了時にサブスクリプションを削除する |
Pub/Sub スキーマ
ストリームのスキーマは、次の表に示すように、Pub/Sub からフェッチされたレコードと一致します。
フィールド |
タイプ |
---|---|
|
|
|
|
|
|
|
|
Pub/Sub ストリーミング読み取りのオプションを構成する
次の表では、Pub/Sub でサポートされるオプションについて説明します。 すべてのオプションは、 .option("<optionName>", "<optionValue>")
構文を使用した構造化ストリーミング読み取りの一部として構成されます。
注:
一部の Pub/Sub 構成オプションでは、マイクロバッチの代わりにフェッチの概念が使用されます。これは内部実装の詳細を反映しており、オプションは他の構造化ストリーミングコネクタの必然的な結果と同様に機能しますが、レコードがフェッチされてから処理される点が異なります。
オプション |
デフォルト値 |
説明 |
---|---|---|
|
ストリーム初期化時に存在するエグゼキューターの数の半分に設定します。 |
サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
|
|
|
|
なし |
トリガーされた各マイクロバッチ中に処理されるバッチサイズのソフト制限。 |
|
1000 |
レコードを処理する前にタスクごとにフェッチするレコードの数。 |
|
10秒 |
レコードを処理する前に各タスクがフェッチする期間。 Databricks では、既定値の使用が推奨されています。 |
Pub/Subの増分バッチ処理セマンティクス
Trigger.AvailableNow
を使用して、Pub/Sub ソースから使用可能なレコードを増分バッチで消費できます。
Databricks では、 Trigger.AvailableNow
設定で読み取りを開始すると、タイムスタンプが記録されます。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプよりもタイムスタンプが小さい新しく公開されたレコードが含まれます。
増分バッチ処理の構成を参照してください。