Lakebase に接続
プレビュー
この機能は パブリック プレビュー段階です。
組み込みのバッチ処理、自動再試行、およびワークスペースで管理される認証により、構造化ストリーミングを使用してLakebaseに書き込むことができます。
Lakebase シンクはいつ使用しますか?
低遅延ストリーミング書き込みのために、LakebaseシンクをLakebaseで使用してください。このシンクでは、バッチ処理、接続管理、エラー処理を行うためのカスタムforeachBatch関数を実装する必要はありません。
ユースケースの例:
- 運用ダッシュボードまたは顧客向け機能のために、アプリケーションデータベースをリアルタイムで更新します。
- 集計またはフィルター処理されたストリーミング結果など、継続的に変化するデータをトランザクションデータベースに同期します。
- 構造化ストリーミングのクエリの出力を、1秒未満のレイテンシでリアルタイムモードを使用してLakebaseテーブルに書き込みます。
要件
- Databricks Runtime 18以降
- 専用または標準アクセスモードのクラシックコンピュート
- Lakebase データベース
データベースに接続します
Lakebase sink は次の接続メソッドをサポートしています。
Unity Catalogに登録されていないLakebaseテーブル
Unity Catalog に登録されていない Lakebase テーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたは Databricks サービスプリンシパルの ID を使用します。
Lakebaseテーブルに書き込むには、endpoint と dbtable オプションを使用してください。次の例には、オプションのdatabase と upsertkey オプションも含まれています:
- Python
- Scala
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)
df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") // Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") // Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
次のプレースホルダーを置き換えます。
<project-id>.<branch-id>.<endpoint-id>Lakebaseエンドポイントです。 「コンピュート」タブの「IDの取得」メニューにある「リソース名」で、 の形式の3つの値をすべて見つけてください。projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>コンピュート識別子を参照してください。<database>:オプション。対象のPostgresデータベースの名前です。デフォルトはdatabricks_postgresになります。データベースを管理するを参照してください。<schema>.<table>``schema.table形式のターゲットテーブルです。スキーマを省略すると、シンクはpublicスキーマを使用します。文字またはアンダースコアで始まり、文字、数字、およびアンダースコアのみを含むシンプルな識別子を使用してください。引用符付き識別子やハイフンなどの特殊文字はサポートされていません。<primary-key-column>:オプション。upsertキーを構成する列のカンマ区切りリスト。例えば、idまたはuser_id,event_type。upsertkeyを省略した場合、シンクはターゲットテーブルの主キーからキーを推測します。アップサート動作を参照してください。/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: クエリがチェックポイントを格納するUnity Catalogボリュームパス。クラウドオブジェクトストレージURIを使用することもできます。ロケーションは、ローカルディスクではなく書き込み可能なストレージである必要があり、各ストリーミングクエリに対して一意である必要があります。これはターゲットテーブルとは無関係です。構造化ストリーミングのチェックポイントを参照してください。
batchsizeやbatchintervalなどのオプションの構成については、構成オプションを参照してください。
構成オプション
シンクは認識できないオプション、JDBC_STREAMING_SINK_INVALID_OPTIONSに対してエラーを返します。
次のオプションは、すべての接続方法に適用されます。
キー | デフォルト | 説明 |
|---|---|---|
|
| オプション。フラッシュする前にバッファ内で行を保持する最大時間。たとえば、 |
|
| オプション。各データベーストランザクションの最大行数。 |
| なし | 必須。Unity Catalog ボリューム( |
| なし | オプション。Upsert キーを形成する列名のコンマ区切りリストたとえば、 |
Lakebase の Unity Catalog に登録されていないテーブル
次のオプションは、Unity Catalog に登録されていない Lakebase テーブルに接続する場合に適用されます:
キー | デフォルト | 説明 |
|---|---|---|
|
| オプション。ターゲットの PostgreSQL データベース名。 |
| なし | 必須。 |
| なし | 必須。Lakebaseエンドポイントは、 |
アップサートの挙動
Upsert キーが、upsertkey で指定されるか、シンクによってテーブルの主キーから推測される場合に、シンクは PostgreSQL の INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... 構文を使用してテーブルに upsert します。
アップサートキーが存在しない場合、シンクは挿入を行います。クエリーの出力モードは、アップサートまたは挿入の動作に影響を与えません。
upsertkey 列は次の必要があります:
- DataFrame の列の空でないサブセットである必要があります。
- ターゲットテーブルの
PRIMARY KEYと正確に一致させます。指定した列が主キーと一致しない場合、クエリは失敗します。 - 数値型や文字列型などの比較可能な型である必要があります。並列書き込み中にデータベースのデッドロックを防ぐために、シンクは各バッチ内でアップサートキーで行をソートします。Upsertキーは複合型または構造体型をサポートしていません。
列名は、PostgreSQLのデフォルトである二重引用符"で自動的に引用符で囲まれ、予約済みキーワードと大文字/小文字が混在する名前が処理されます。
テーブル名とスキーマ名は、英文字またはアンダースコアで始まり、英数字またはアンダースコアのみを含むシンプルな識別子を使用する必要があります。シンクは、テーブル名またはスキーマ名に、引用符で囲まれた識別子やハイフンなどの特殊文字をサポートしていません。
パフォーマンスチューニング
バッチ処理とバックプレッシャー
フラッシュはいずれかの条件が満たされた場合にトリガーされます。
- バッファーが
batchsize行に達すると、デフォルトで1000に設定されます。 - バッファー期間が
batchintervalを超えています。デフォルトは100 millisecondsです。
データベースが流入するデータ速度に追いつけなくなると、シンクはアップストリームでバックプレッシャーをソースに伝播します。
レイテンシーとスループットのガイダンス:
- リアルタイムモードの低レイテンシのワークロードでは、フラッシュ前の最大時間を短くするために
batchintervalを減らしてください。構造化ストリーミングのリアルタイムモードを参照してください。 - ハイスループットのワークロードでは、
batchsizeを増加させて、各トランザクションのオーバーヘッドを削減します。
接続動作
シンクはエグゼキューターで接続プールを使用します。デフォルトでは、各タスクは1つのデータベース接続を使用します。
Databricks は、接続ごとに 1 タスクのデフォルト値を使用することをお勧めします。各接続のタスク数を増やすと、接続の競合を発生させ、高スループット接続の遅延が増加する可能性があります。
タスクと接続の比率を構成するには、spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark 構成を設定します。ターゲットデータベースの接続制限が低い場合、シャッフルパーティションの数を減らすか、またはspark.databricks.sql.streaming.jdbc.tasksPerConnectionを増やす必要があります。
シンクは、接続障害、デッドロック、レート制限など一時的なJDBCエラーを自動的に再試行します。シンクがすべての再試行を使い果たした場合、クエリは失敗します。
サポートされるトリガーと出力モード
トリガー
この表に、構造化ストリーミングのトリガーの種類のサポートを示します。
トリガー | サポートされています |
|---|---|
| はい |
| はい |
| はい |
| はい |
出力モード
この表は、構造化ストリーミング出力モードのサポートを示しています:
出力モード | サポートされています |
|---|---|
| はい |
| はい。 |
| No |
制限事項
- サーバレス コンピュート と LakeFlow Spark宣言型パイプライン はサポートされていません。
- Lakebase のみが書き込みターゲットとしてサポートされています。外部のPostgreSQL互換データベースはサポートされていません。