クエリベースのデータ取り込みパイプラインを作成する
プレビュー
この機能は パブリック プレビュー段階です。
このページではLakeFlow Connectでクエリベースの取り込みパイプラインを作成する方法を示します。
要件
クエリベースのデータ取り込みパイプラインを作成する前に、まず以下の要件を満たす必要があります。
- Unity CatalogがDatabricksワークスペースに対して有効になっています。
- サーバーレス コンピュート環境では、ソース データベースへのネットワーク接続が可能です。 レイクハウスフェデレーションの ネットワーキング とネットワーキングの推奨事項を参照してください。
- 外部接続によるデータ取り込み の場合:ソースデータベースへの既存の接続があるか、メタストアに対する権限が
CREATE CONNECTIONある必要があります。管理対象のデータ取り込みソースへの接続を参照してください。 - フォーリンカタログの取り込み :レイクハウスフェデレーションに登録されている既存のフォーリンカタログ、またはフォーリンカタログを作成する権限を持っています。
- 宛先カタログとスキーマに対して、
CREATEとUSE SCHEMA権限が付与されています。
オプション1:外部接続からの取り込み
ソースデータベースの認証情報を保存している接続がある場合は、この方法を使用してください。サポートされているデータソースには、Oracle、Teradata、SQL Server、MySQL、MariaDB、PostgreSQLが含まれます。
- Databricks UI
- Declarative Automation Bundles
DatabricksのUIは、クエリベースのパイプライン作成をサポートしています。クラシック コンピュート ( Beta ) でデプロイするには、Declarative Automation Bundle を使用します。
-
Databricksワークスペースのサイドバーで、 「データ取り込み」を クリックします。
-
「データの追加」 ページで、 「Databricksコネクタ」 の下にあるソース(たとえば、 Oracle または SQL Server )をクリックします。取り込みウィザードが開きます。
-
取り込みパイプラインの ページで、パイプラインの一意の名前を入力します。
-
「宛先カタログ」 には、取り込んだデータを保存するUnity Catalogを選択してください。
-
特定のデータベースにアクセスするために必要な認証情報が保存されているUnity Catalog接続を選択してください。
既存の接続がない場合は、 「接続を作成」 をクリックして接続の詳細を入力してください。メタストアに対して
CREATE CONNECTION権限が必要です。 -
パイプラインの作成および続行 をクリックします。
-
ソース ページで、取り込むスキーマとテーブルを選択します。
-
各テーブルについて、 カーソル列 を指定します。これは、値が単調増加する単一の列である必要があります(たとえば、
updated_atまたはrow_id)。単調増加するカーソル列を選択しない場合、コネクタは実行ごとにフルロードを実行します。 -
必要に応じて、デフォルトの履歴追跡設定を変更できます。詳細については、 「履歴追跡を有効にする( SCDタイプ2)」を参照してください。
-
次へ をクリックします。
-
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、 「スキーマの作成」 をクリックしてください。親カタログに対して
USE CATALOGとCREATE SCHEMA権限が必要です。 -
保存して続行 をクリックします。
-
(オプション) 設定 ページで、 [スケジュールの作成]を クリックし、更新頻度を設定します。
-
(オプション) パイプラインの成功または失敗に関する電子メール通知を設定します。
-
パイプラインの保存と実行 をクリックします。
宣言型自動化バンドルを使用して、クエリベースのデータ取り込みパイプラインをデプロイします。バンドルにはパイプラインとジョブのYAML定義が含まれており、Databricks CLIで管理され、複数のターゲットワークスペースにデプロイできます。詳細については、 「宣言的オートメーション バンドルとは何ですか?」を参照してください。 。
-
新しいバンドルを作成します:
Bashdatabricks bundle init -
パイプライン定義ファイルをバンドルに追加します(例:
resources/query_based_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_query_based:
name: query-based-ingestion-pipeline
ingestion_definition:
connection_name: <your-uc-connection-name>
objects:
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
table_configuration:
query_based_connector_config:
cursor_columns:
- updated_at
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
取り込みスケジュールを制御するジョブ定義ファイルを追加します(例:
resources/query_based_job.yml)。YAMLresources:
jobs:
query_based_job:
name: query_based_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_query_based.id} -
バンドルをデプロイします。
Bashdatabricks bundle deploy
オプション 2: フォーリンカタログの取り込み
フォーリンカタログに登録されたレイクハウスフェデレーションから取り込みたい場合は、このアプローチを使用します。 フォーリンカタログの取り込みは、すべてのレイクハウスフェデレーション データソースをサポートし、削除追跡をサポートします。
- Databricks UI
- Declarative Automation Bundles
-
Databricksワークスペースのサイドバーで、 「データ取り込み」を クリックします。
-
「データの追加」 ページで、 「Databricksコネクタ」 の下にあるソースをクリックします。取り込みウィザードが開きます。
-
取り込みパイプラインの ページで、パイプラインの一意の名前を入力します。
-
「宛先カタログ」 には、取り込んだデータを保存するUnity Catalogを選択してください。
-
接続タイプ で フォーリンカタログを 選択し、レイクハウスフェデレーションに登録されているフォーリンカタログを選択します。
-
パイプラインの作成および続行 をクリックします。
-
ソース ページで、取り込むスキーマとテーブルを選択します。
-
各テーブルについて、 カーソル列 を指定します。これは、値が単調増加する単一の列である必要があります(たとえば、
updated_atまたはrow_id)。 -
必要に応じて、デフォルトの履歴追跡設定を変更できます。詳細については、 「履歴追跡を有効にする( SCDタイプ2)」を参照してください。
-
次へ をクリックします。
-
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、 「スキーマの作成」 をクリックしてください。親カタログに対して
USE CATALOGとCREATE SCHEMA権限が必要です。 -
保存して続行 をクリックします。
-
(オプション) 設定 ページで、 [スケジュールの作成]を クリックし、更新頻度を設定します。
-
(オプション) パイプラインの成功または失敗に関する電子メール通知を設定します。
-
パイプラインの保存と実行 をクリックします。
-
新しいバンドルを作成します:
Bashdatabricks bundle init -
パイプライン定義ファイルをバンドルに追加します(例:
resources/foreign_catalog_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_foreign_catalog:
name: foreign-catalog-ingestion-pipeline
ingestion_definition:
ingest_from_uc_foreign_catalog: true
objects:
- table:
source_catalog: <foreign-catalog-name>
source_schema: <source-schema>
source_table: <source-table>
cursor_columns:
- updated_at
primary_keys:
- id
deletion_condition: 'deleted_at IS NOT NULL'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
ジョブ定義ファイルを追加します(例:
resources/foreign_catalog_job.yml):YAMLresources:
jobs:
foreign_catalog_job:
name: foreign_catalog_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id} -
バンドルをデプロイします。
Bashdatabricks bundle deploy
増分追跡を設定する
クエリベースのコネクタは、カーソル列を使用して、前回のパイプライン実行以降に新規または更新された行を特定します。カーソル列の選択は、効果的な増分データ取り込みにとって非常に重要です。
カーソル列を選択する際には、以下の点を考慮してください。
- 可能であれば、タイムスタンプ列を使用してください。
updated_atやlast_modifiedのような列は、行が最後に変更された日時を直接反映するため、理想的です。 - 整数IDは、追記専用のソースで使用できます。行が更新されない場合は、自動インクリメントされるID列(
idやrow_idなど)をカーソルとして使用できます。行をIDを変更せずに更新できる場合は、整数IDをカーソルとして使用しないでください。 - その列は単調増加でなければならない。価値は決して低下してはならない。列を過去の値に設定できる場合(例えば、バックフィルによって)、前回の最高値より前に書き込まれた行は再取り込みされません。
- カーソル列は1つしか指定できません。複合カーソルとして複数の列を指定することはできません。
コネクタはカーソルの最高水位マークを保存した後、次の実行時にその最高水位マークを下限フィルタ( cursor_column > last_value )として使用します。カーソル値がNULLの行は取り込まれません。
履歴追跡(SCD)を設定する
宛先テーブルの行変更履歴全体を追跡するには、SCDタイプ2を構成します。 「履歴追跡を有効にする(SCDタイプ2)」を参照してください。
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。