メインコンテンツまでスキップ

Lakebase に接続

備考

プレビュー

この機能は パブリック プレビュー段階です。

組み込みのバッチ処理、自動再試行、およびワークスペースで管理される認証により、構造化ストリーミングを使用してLakebaseに書き込むことができます。

Lakebase シンクはいつ使用しますか?

低遅延ストリーミング書き込みのために、LakebaseシンクをLakebaseで使用してください。このシンクでは、バッチ処理、接続管理、エラー処理を行うためのカスタムforeachBatch関数を実装する必要はありません。


ユースケースの例:

  • 運用ダッシュボードまたは顧客向け機能のために、アプリケーションデータベースをリアルタイムで更新します。
  • 集計またはフィルター処理されたストリーミング結果など、継続的に変化するデータをトランザクションデータベースに同期します。
  • 構造化ストリーミングのクエリの出力を、1秒未満のレイテンシでリアルタイムモードを使用してLakebaseテーブルに書き込みます。

Lakebase からレイクハウス内のDelta Lakeテーブルにデータを逆方向に同期するには、「Lakebase チェンジデータフィード」を参照してください。

要件

  • Databricks Runtime 18以降
  • 専用または標準アクセスモードのクラシックコンピュート
  • Lakebase データベース

データベースに接続します

Lakebase sink は次の接続メソッドをサポートしています。

Unity Catalog に登録されている Lakebase テーブル

Unity Catalog に登録されたLakebaseテーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたはDatabricksサービスプリンシパルのIDを使用します。テーブルが存在しない場合、コネクタがテーブルを作成します。

Lakebase データベースを Unity Catalog に登録するには、Unity Catalog で Lakebase データベースを登録するを参照してください。

Lakebaseテーブルに書き込むには、完全修飾テーブル名catalog.schema.tableを指定して.toTable()メソッドを使用します。次の例は、必須オプションに加えて、オプションのupsertkeyを示しています。

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.toTable("<catalog>.<schema>.<table>")
)

次のプレースホルダーを置き換えます。

  • <catalog>.<schema>.<table>:ターゲットテーブルの完全修飾名。The catalog is the Unity Catalog catalog you created when you registered the Lakebase database, see Unity CatalogにLakebaseデータベースを登録する.テーブルが存在しない場合は、コネクタが作成します。
  • <primary-key-column>:オプション。upsertキーを構成する列のカンマ区切りリスト。例えば、idまたはuser_id,event_typeupsertkeyを省略した場合、シンクはターゲットテーブルの主キーからキーを推測します。アップサート動作を参照してください。
  • /Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: クエリがチェックポイントを格納するUnity Catalogボリュームパス。クラウドオブジェクトストレージURIを使用することもできます。ロケーションは、ローカルディスクではなく書き込み可能なストレージである必要があり、各ストリーミングクエリに対して一意である必要があります。これはターゲットテーブルとは無関係です。構造化ストリーミングのチェックポイントを参照してください。

batchsizebatchintervalなどのオプションの構成については、構成オプションを参照してください。

Unity Catalogに登録されていないLakebaseテーブル

Unity Catalog に登録されていない Lakebase テーブルの場合、コネクタは自動的に資格情報を管理し、クエリを実行しているユーザーまたは Databricks サービスプリンシパルの ID を使用します。

Lakebaseテーブルに書き込むには、endpointdbtable オプションを使用してください。次の例には、オプションのdatabaseupsertkey オプションも含まれています:

Python
(df.writeStream
.format("postgresql")
.outputMode("update")
.option("endpoint", "<project-id>.<branch-id>.<endpoint-id>")
.option("database", "<database>") # Optional. Defaults to databricks_postgres.
.option("dbtable", "<schema>.<table>")
.option("upsertkey", "<primary-key-column>") # Optional. Inferred from the table's primary key if omitted.
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>")
.start()
)

次のプレースホルダーを置き換えます。

  • <project-id>.<branch-id>.<endpoint-id>Lakebaseエンドポイントです。 「コンピュート」タブの「IDの取得」メニューにある「リソース名」で、 の形式の3つの値をすべて見つけてください。projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>コンピュート識別子を参照してください。
  • <database>:オプション。対象のPostgresデータベースの名前です。デフォルトはdatabricks_postgresになります。データベースを管理するを参照してください。
  • <schema>.<table>``schema.table 形式のターゲットテーブルです。スキーマを省略すると、シンクは public スキーマを使用します。文字またはアンダースコアで始まり、文字、数字、およびアンダースコアのみを含むシンプルな識別子を使用してください。引用符付き識別子やハイフンなどの特殊文字はサポートされていません。
  • <primary-key-column>:オプション。upsertキーを構成する列のカンマ区切りリスト。例えば、idまたはuser_id,event_typeupsertkeyを省略した場合、シンクはターゲットテーブルの主キーからキーを推測します。アップサート動作を参照してください。
  • /Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>: クエリがチェックポイントを格納するUnity Catalogボリュームパス。クラウドオブジェクトストレージURIを使用することもできます。ロケーションは、ローカルディスクではなく書き込み可能なストレージである必要があり、各ストリーミングクエリに対して一意である必要があります。これはターゲットテーブルとは無関係です。構造化ストリーミングのチェックポイントを参照してください。

batchsizebatchintervalなどのオプションの構成については、構成オプションを参照してください。

構成オプション

シンクは認識できないオプション、JDBC_STREAMING_SINK_INVALID_OPTIONSに対してエラーを返します。

次のオプションは、すべての接続方法に適用されます。

キー

デフォルト

説明

batchinterval

100 milliseconds

オプション。フラッシュする前にバッファ内で行を保持する最大時間。たとえば、"50 milliseconds"などです。

batchsize

1000

オプション。各データベーストランザクションの最大行数。

checkpointLocation

なし

必須。Unity Catalog ボリューム(/Volumes/<catalog>/<schema>/<volume>/<checkpoint-name>)などのチェックポイントディレクトリへのパスです。各クエリで一意である必要があります。構造化ストリーミングチェックポイントを参照してください。

upsertkey

なし

オプション。Upsert キーを形成する列名のコンマ区切りリストたとえば、"id""user_id,event_type" などです。upsertkey を指定した場合、列はテーブルの主キーと一致する必要があります。そうしないと、クエリは失敗します。省略すると、シンクはプライマリキーを自動的に使用します。詳細情報については、「Upsert の動作」を参照してください。

Lakebase の Unity Catalog に登録されていないテーブル

次のオプションは、Unity Catalog に登録されていない Lakebase テーブルに接続する場合に適用されます:

キー

デフォルト

説明

database

databricks_postgres

オプション。ターゲットの PostgreSQL データベース名。

dbtable

なし

必須。schema.table形式のターゲットテーブル名です。スキーマを指定しない場合、デフォルトのスキーマ値は public です。文字またはアンダースコアで始まるシンプルな識別子を使用し、文字、数字、およびアンダースコアのみを含めます。テーブル名やスキーマ名を引用符で囲まないでください。引用符で囲まれた識別子や、ハイフンなどの特殊文字を含む名前はサポートされていません。

endpoint

なし

必須。Lakebaseエンドポイントは、project_id.branch_idまたはproject_id.branch_id.endpoint_id形式です。endpoint_idはオプションです。省略し、ブランチに単一の読み書きエンドポイントがある場合、シンクはデフォルトでそのエンドポイントを選択します。

アップサートの挙動

Upsert キーが、upsertkey で指定されるか、シンクによってテーブルの主キーから推測される場合に、シンクは PostgreSQL の INSERT INTO ... ON CONFLICT (<upsert_key>) DO UPDATE SET ... 構文を使用してテーブルに upsert します。

アップサートキーが存在しない場合、シンクは挿入を行います。クエリーの出力モードは、アップサートまたは挿入の動作に影響を与えません。

upsertkey 列は次の必要があります:

  • DataFrame の列の空でないサブセットである必要があります。
  • ターゲットテーブルのPRIMARY KEYと正確に一致させます。指定した列が主キーと一致しない場合、クエリは失敗します。
  • 数値型や文字列型などの比較可能な型である必要があります。並列書き込み中にデータベースのデッドロックを防ぐために、シンクは各バッチ内でアップサートキーで行をソートします。Upsertキーは複合型または構造体型をサポートしていません。

列名は、PostgreSQLのデフォルトである二重引用符"で自動的に引用符で囲まれ、予約済みキーワードと大文字/小文字が混在する名前が処理されます。

テーブル名とスキーマ名は、英文字またはアンダースコアで始まり、英数字またはアンダースコアのみを含むシンプルな識別子を使用する必要があります。シンクは、テーブル名またはスキーマ名に、引用符で囲まれた識別子やハイフンなどの特殊文字をサポートしていません。

パフォーマンスチューニング

バッチ処理とバックプレッシャー

フラッシュはいずれかの条件が満たされた場合にトリガーされます。

  • バッファーが batchsize 行に達すると、デフォルトで 1000 に設定されます。
  • バッファー期間がbatchintervalを超えています。デフォルトは100 millisecondsです。

データベースが流入するデータ速度に追いつけなくなると、シンクはアップストリームでバックプレッシャーをソースに伝播します。

レイテンシーとスループットのガイダンス:

  • リアルタイムモードの低レイテンシのワークロードでは、フラッシュ前の最大時間を短くするために batchinterval を減らしてください。構造化ストリーミングのリアルタイムモードを参照してください。
  • ハイスループットのワークロードでは、batchsizeを増加させて、各トランザクションのオーバーヘッドを削減します。

接続動作

シンクはエグゼキューターで接続プールを使用します。デフォルトでは、各タスクは1つのデータベース接続を使用します。

Databricks は、接続ごとに 1 タスクのデフォルト値を使用することをお勧めします。各接続のタスク数を増やすと、接続の競合を発生させ、高スループット接続の遅延が増加する可能性があります。

タスクと接続の比率を構成するには、spark.databricks.sql.streaming.jdbc.tasksPerConnection Spark 構成を設定します。ターゲットデータベースの接続制限が低い場合、シャッフルパーティションの数を減らすか、またはspark.databricks.sql.streaming.jdbc.tasksPerConnectionを増やす必要があります。

シンクは、接続障害、デッドロック、レート制限など一時的なJDBCエラーを自動的に再試行します。シンクがすべての再試行を使い果たした場合、クエリは失敗します。

サポートされるトリガーと出力モード

トリガー

この表に、構造化ストリーミングのトリガーの種類のサポートを示します。

トリガー

サポートされています

realTime

はい

ProcessingTime

はい

AvailableNow

はい

Once

はい

出力モード

この表は、構造化ストリーミング出力モードのサポートを示しています:

出力モード

サポートされています

update

はい

append

はい。updateと同一です。ターゲットテーブルにプライマリキーがある場合、クエリはアップサートを実行します。そうでない場合、クエリは挿入を実行します。アップサートの挙動を参照してください。

complete

No

制限事項

  • サーバレス コンピュート と LakeFlow Spark宣言型パイプライン はサポートされていません。
  • Lakebase のみが書き込みターゲットとしてサポートされています。外部のPostgreSQL互換データベースはサポートされていません。