Lakebase に接続
プレビュー
この機能は パブリック プレビュー段階です。
組み込みのバッチ処理、自動再試行、およびワークスペースで管理される認証により、構造化ストリーミングを使用してLakebaseに書き込むことができます。
Lakebase シンクはいつ使用しますか?
低遅延ストリーミング書き込みのために、LakebaseシンクをLakebaseで使用してください。このシンクでは、バッチ処理、接続管理、エラー処理を行うためのカスタムforeachBatch関数を実装する必要はありません。
ユースケースの例:
- 運用ダッシュボードまたは顧客向け機能のために、アプリケーションデータベースをリアルタイムで更新します。
- 集計またはフィルター処理されたストリーミング結果など、継続的に変化するデータをトランザクションデータベースに同期します。
- 構造化ストリーミングのクエリの出力を、1秒未満のレイテンシでリアルタイムモードを使用してLakebaseテーブルに書き込みます。
要件
- Databricks Runtime 18.3以降
- 専用または標準アクセスモードのクラシックコンピュート
- Lakebase データベース
データベースに接続します
Lakebase sink は次の接続メソッドをサポートしています。
Unity Catalogに登録されていないLakebaseテーブル
Unity Catalog に登録されていない Lakebase テーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたは Databricks サービスプリンシパルの ID を使用します。
Lakebaseテーブルに書き込むには、endpoint、database、dbtable、およびupsertkeyのオプションを使用します:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>")
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>")
.option("checkpointLocation", "/checkpoints/<query-name>")
.start()
構成オプション
シンクは認識できないオプション、JDBC_STREAMING_SINK_INVALID_OPTIONSに対してエラーを返します。
次のオプションは、すべての接続方法に適用されます。
キー | デフォルト | 説明 |
|---|---|---|
|
| フラッシュするまでのバッファー内の行保持の最大時間です。たとえば、 |
|
| 各データベーストランザクションの最大行数。 |
| なし | 必須。チェックポイントディレクトリへのパスです。 |
| なし | アップサートキーを構成する列名のコンマ区切りリストたとえば、 |
Lakebase の Unity Catalog に登録されていないテーブル
次のオプションは、Unity Catalog に登録されていない Lakebase テーブルに接続する場合に適用されます:
キー | デフォルト | 説明 |
|---|---|---|
| なし | 対象のPostgreSQLデータベース名 |
| なし | ターゲットテーブル名は |
| なし | Lakebase エンドポイントを |
アップサートの挙動
Upsert キーが、upsertkey で指定されるか、シンクによってテーブルの主キーから推測される場合に、シンクは PostgreSQL の INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... 構文を使用してテーブルに upsert します。
アップサートキーが存在しない場合、シンクは挿入を行います。クエリーの出力モードは、アップサートまたは挿入の動作に影響を与えません。
upsertkey 列は次の必要があります:
- DataFrame の列の空でないサブセットである必要があります。
- ターゲットテーブルの列を
PRIMARY KEY制約を使用して参照します。 - 数値型や文字列型などの比較可能な型である必要があります。並列書き込み中にデータベースのデッドロックを防ぐために、シンクは各バッチ内でアップサートキーで行をソートします。Upsertキーは複合型または構造体型をサポートしていません。
列名は、PostgreSQL のデフォルトである二重引用符「"」で自動的に囲まれ、予約済みキーワード、大文字と小文字が混在した名前、特殊文字が処理されます。
シンクはテーブル名をクォートせず、そのままデータベースに渡します。特殊文字("my-schema"."my-table"など)を含むテーブル名は、引用符で囲む必要があります。
パフォーマンスチューニング
バッチ処理とバックプレッシャー
フラッシュはいずれかの条件が満たされた場合にトリガーされます。
- バッファーが
batchsize行に達すると、デフォルトで1000に設定されます。 - バッファー期間が
batchintervalを超えています。デフォルトは100 millisecondsです。
データベースが流入するデータ速度に追いつけなくなると、シンクはアップストリームでバックプレッシャーをソースに伝播します。
レイテンシーとスループットのガイダンス:
- リアルタイムモードの低レイテンシのワークロードでは、フラッシュ前の最大時間を短くするために
batchintervalを減らしてください。構造化ストリーミングのリアルタイムモードを参照してください。 - ハイスループットのワークロードでは、
batchsizeを増加させて、各トランザクションのオーバーヘッドを削減します。
接続動作
シンクはエグゼキューターで接続プールを使用します。デフォルトでは、各タスクは1つのデータベース接続を使用します。
Databricks は、接続ごとに 1 タスクのデフォルト値を使用することをお勧めします。各接続のタスク数を増やすと、接続の競合を発生させ、高スループット接続の遅延が増加する可能性があります。
タスクと接続の比率を構成するには、spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark 構成を設定します。ターゲットデータベースの接続制限が低い場合、シャッフルパーティションの数を減らすか、またはspark.databricks.sql.streaming.jdbc.tasksPerConnectionを増やす必要があります。
シンクは、接続障害、デッドロック、レート制限など一時的なJDBCエラーを自動的に再試行します。シンクがすべての再試行を使い果たした場合、クエリは失敗します。
サポートされるトリガーと出力モード
トリガー
この表に、構造化ストリーミングのトリガーの種類のサポートを示します。
トリガー | サポートされています |
|---|---|
| はい |
| はい |
| はい |
| はい |
出力モード
この表は、構造化ストリーミング出力モードのサポートを示しています:
出力モード | サポートされています |
|---|---|
| はい |
| はい。 |
| No |
制限事項
- サーバレス コンピュート と LakeFlow Spark宣言型パイプライン はサポートされていません。
- Lakebase のみが書き込みターゲットとしてサポートされています。外部のPostgreSQL互換データベースはサポートされていません。