メインコンテンツまでスキップ

Lakebase に接続

備考

プレビュー

この機能は パブリック プレビュー段階です。

組み込みのバッチ処理、自動再試行、およびワークスペースで管理される認証により、構造化ストリーミングを使用してLakebaseに書き込むことができます。

Lakebase シンクはいつ使用しますか?

低遅延ストリーミング書き込みのために、LakebaseシンクをLakebaseで使用してください。このシンクでは、バッチ処理、接続管理、エラー処理を行うためのカスタムforeachBatch関数を実装する必要はありません。


ユースケースの例:

  • 運用ダッシュボードまたは顧客向け機能のために、アプリケーションデータベースをリアルタイムで更新します。
  • 集計またはフィルター処理されたストリーミング結果など、継続的に変化するデータをトランザクションデータベースに同期します。
  • 構造化ストリーミングのクエリの出力を、1秒未満のレイテンシでリアルタイムモードを使用してLakebaseテーブルに書き込みます。

LakebaseからレイクハウスのDeltaテーブルへデータを同期する (逆方向) 方法については、「Lakebase チェンジデータフィード」を参照してください。

要件

  • Databricks Runtime 18.3以降
  • 専用または標準アクセスモードのクラシックコンピュート
  • Lakebase データベース

データベースに接続します

Lakebase sink は次の接続メソッドをサポートしています。

Unity Catalog に登録されている Lakebase テーブル

Unity Catalog に登録されたLakebaseテーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたはDatabricksサービスプリンシパルのIDを使用します。テーブルが存在しない場合、コネクタがテーブルを作成します。

Lakebase データベースを Unity Catalog に登録するには、Unity Catalog で Lakebase データベースを登録するを参照してください。

Lakebase テーブルに書き込むには、完全修飾テーブル名 catalog.schema.table を指定して、upsertkey オプションと .toTable() メソッドを使用します。

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.toTable("<catalog>.<schema>.<table>")
)

Unity Catalogに登録されていないLakebaseテーブル

Unity Catalog に登録されていない Lakebase テーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたは Databricks サービスプリンシパルの ID を使用します。

Lakebaseテーブルに書き込むには、endpointdatabasedbtable、およびupsertkeyのオプションを使用します:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)

構成オプション

シンクは認識できないオプション、JDBC_STREAMING_SINK_INVALID_OPTIONSに対してエラーを返します。

次のオプションは、すべての接続方法に適用されます。

キー

デフォルト

説明

batchinterval

100 milliseconds

フラッシュするまでのバッファー内の行保持の最大時間です。たとえば、"50 milliseconds"

batchsize

1000

各データベーストランザクションの最大行数。

checkpointLocation

なし

必須。チェックポイントディレクトリへのパスです。

upsertkey

なし

アップサートキーを構成する列名のコンマ区切りリストたとえば、"id""user_id,event_type"などです。ターゲットテーブルは、指定された列にPRIMARY KEY制約を持つ必要があります。アップサートキーを指定しない場合、シンクはターゲットテーブルのスキーマからプライマリーキーを推論します。主キーが存在しない場合、クエリは更新せずに行を挿入します。

Lakebase の Unity Catalog に登録されていないテーブル

次のオプションは、Unity Catalog に登録されていない Lakebase テーブルに接続する場合に適用されます:

キー

デフォルト

説明

database

なし

対象のPostgreSQLデータベース名

dbtable

なし

ターゲットテーブル名はschema.table形式です。スキーマを指定しない場合、デフォルトのスキーマ値はpublicです。

endpoint

なし

Lakebase エンドポイントを project_id.branch_id.endpoint_id 形式で指定します。

アップサートの挙動

Upsert キーが、upsertkey で指定されるか、シンクによってテーブルの主キーから推測される場合に、シンクは PostgreSQL の INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... 構文を使用してテーブルに upsert します。

アップサートキーが存在しない場合、シンクは挿入を行います。クエリーの出力モードは、アップサートまたは挿入の動作に影響を与えません。

upsertkey 列は次の必要があります:

  • DataFrame の列の空でないサブセットである必要があります。
  • ターゲットテーブルの列をPRIMARY KEY制約を使用して参照します。
  • 数値型や文字列型などの比較可能な型である必要があります。並列書き込み中にデータベースのデッドロックを防ぐために、シンクは各バッチ内でアップサートキーで行をソートします。Upsertキーは複合型または構造体型をサポートしていません。

列名は、PostgreSQL のデフォルトである二重引用符「"」で自動的に囲まれ、予約済みキーワード、大文字と小文字が混在した名前、特殊文字が処理されます。

シンクはテーブル名をクォートせず、そのままデータベースに渡します。特殊文字("my-schema"."my-table"など)を含むテーブル名は、引用符で囲む必要があります。

パフォーマンスチューニング

バッチ処理とバックプレッシャー

フラッシュはいずれかの条件が満たされた場合にトリガーされます。

  • バッファーが batchsize 行に達すると、デフォルトで 1000 に設定されます。
  • バッファー期間がbatchintervalを超えています。デフォルトは100 millisecondsです。

データベースが流入するデータ速度に追いつけなくなると、シンクはアップストリームでバックプレッシャーをソースに伝播します。

レイテンシーとスループットのガイダンス:

  • リアルタイムモードの低レイテンシのワークロードでは、フラッシュ前の最大時間を短くするために batchinterval を減らしてください。構造化ストリーミングのリアルタイムモードを参照してください。
  • ハイスループットのワークロードでは、batchsizeを増加させて、各トランザクションのオーバーヘッドを削減します。

接続動作

シンクはエグゼキューターで接続プールを使用します。デフォルトでは、各タスクは1つのデータベース接続を使用します。

Databricks は、接続ごとに 1 タスクのデフォルト値を使用することをお勧めします。各接続のタスク数を増やすと、接続の競合を発生させ、高スループット接続の遅延が増加する可能性があります。

タスクと接続の比率を構成するには、spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark 構成を設定します。ターゲットデータベースの接続制限が低い場合、シャッフルパーティションの数を減らすか、またはspark.databricks.sql.streaming.jdbc.tasksPerConnectionを増やす必要があります。

シンクは、接続障害、デッドロック、レート制限など一時的なJDBCエラーを自動的に再試行します。シンクがすべての再試行を使い果たした場合、クエリは失敗します。

サポートされるトリガーと出力モード

トリガー

この表に、構造化ストリーミングのトリガーの種類のサポートを示します。

トリガー

サポートされています

realTime

はい

ProcessingTime

はい

AvailableNow

はい

Once

はい

出力モード

この表は、構造化ストリーミング出力モードのサポートを示しています:

出力モード

サポートされています

update

はい

append

はい。updateと同一です。ターゲットテーブルにプライマリキーがある場合、クエリはアップサートを実行します。そうでない場合、クエリは挿入を実行します。アップサートの挙動を参照してください。

complete

No

制限事項

  • サーバレス コンピュート と LakeFlow Spark宣言型パイプライン はサポートされていません。
  • Lakebase のみが書き込みターゲットとしてサポートされています。外部のPostgreSQL互換データベースはサポートされていません。