メインコンテンツまでスキップ

Hive metastoreラインを複製してUnity Catalogパイプラインを作成する

この記事では、 Databricks REST APIのclone a pipelineリクエストと、それを使用してHive metastoreに公開する既存のパイプラインをUnity Catalogに公開する新しいパイプラインにコピーする方法について説明します。 clone a pipelineリクエストを呼び出すと、次の処理が行われます。

  • 既存のパイプラインのソース コードと構成を新しいパイプラインにコピーし、指定した構成のオーバーライドを適用します。
  • Unity Catalogで管理されるオブジェクトに必要な変更を加えて、マテリアライズドビューとストリーミングテーブルの定義と参照を更新します。
  • パイプラインの更新を開始して、パイプライン内のストリーミング テーブルの既存のデータとメタデータ (チェックポイントなど) を移行します。 これにより、ストリーミング テーブルは元のパイプラインと同じ時点で処理を再開できます。

クローン操作が完了すると、元のパイプラインと新しいパイプラインの両方を独立して実行できるようになります。

この記事には、API 要求を直接呼び出す例と、Databricks ノートブックから Python スクリプトを介して呼び出す例が含まれています。

始める前に

パイプラインを複製する前に、次のものが必要です。

  • Hive metastoreパイプラインのクローンを作成するには、パイプラインで定義されたテーブルとビューがテーブルをターゲット スキーマに公開する必要があります。 ターゲット スキーマをパイプラインに追加する方法については、 Hive metastoreに公開するようにパイプラインを構成する」を参照してください。

  • クローンを作成するパイプライン内のHive metastoreまたはビューへの参照は、カタログ ( hive_metastore )、スキーマ、およびテーブル名で完全修飾する必要があります。 たとえば、 customersデータセットを作成する次のコードでは、テーブル名引数をhive_metastore.sales.customersに更新する必要があります。

    Python
    @dp.table
    def customers():
    return spark.read.table("sales.customers").where(...)
  • クローン操作の進行中は、パイプラインの一部として構成されたノートブックや、 Gitフォルダーまたはワークスペース ファイルに保存されているモジュールを含む、ソースHive metastoreパイプラインのソース コードを編集しないでください。

  • クローン操作を開始するときに、ソースHive metastoreパイプラインが実行されていてはなりません。 更新が実行中の場合は、停止するか、完了するまで待機します。

パイプラインを複製する前に考慮すべきその他の重要な事項は次のとおりです。

  • Hive metastoreパイプライン内のテーブルで、 Pythonのpath引数またはSQLのLOCATION引数を使用してストレージの場所を指定する場合は、 "pipelines.migration.ignoreExplicitPath": "true"構成をクローン リクエストに渡します。 この構成の設定については、以下の手順に含まれています。
  • Hive metastoreパイプラインにcloudFiles.schemaLocationオプションの値を指定するAuto Loaderソースが含まれており、 Unity Catalogクローンの作成後もHive metastoreパイプラインが動作し続ける場合は、 Hive metastoreパイプラインとクローン作成されたUnity Catalogパイプラインの両方でmergeSchemaオプションをtrueに設定する必要があります。 クローンを作成する前にこのオプションをHive metastoreパイプラインに追加すると、オプションが新しいパイプラインにコピーされます。

Databricks REST API を使用してパイプラインを複製する

次の例では、 curlコマンドを使用して、Databricks REST API のclone a pipeline要求を呼び出します。

Bash
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json

以下のように置き換えてください。

clone-pipeline.json:

JSON
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}

以下のように置き換えてください。

  • <target-catalog-name> 新しいパイプラインを公開する Unity Catalog 内のカタログの名前。これは既存のカタログである必要があります。
  • <target-schema-name> 現在のスキーマ名と異なる場合は、新しいパイプラインを公開するUnity Catalog内のスキーマの名前に置き換えます。 これはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。
  • <new-pipeline-name> 新しいパイプラインのオプションの名前。指定しない場合、新しいパイプラインの名前は、ソース パイプライン名に[UC]が追加された名前になります。

clone_mode クローン操作に使用するモードを指定します。サポートされているオプションはMIGRATE_TO_UCのみです。

新しいパイプラインの構成を指定するには、 configurationフィールドを使用します。ここで設定された値は、元のパイプラインの設定を上書きします。

clone REST APIリクエストからの応答は、新しいUnity Catalogパイプラインのパイプライン ID です。

Databricks ノートブックからパイプラインを複製する

次の例では、Python スクリプトからcreate a pipelineリクエストを呼び出します。このスクリプトを実行するには、Databricks ノートブックを使用できます。

  1. スクリプト用の新しいノートブックを作成します。「ノートブックを作成する」を参照してください。

  2. 次の Python スクリプトをノートブックの最初のセルにコピーします。

  3. 次のように置き換えて、スクリプト内のプレースホルダーの値を更新します。

    • <databricks-instance> Databricksワークスペースインスタンス名を使用します。例: dbc-a1b2345c-d6e7.cloud.databricks.com
    • <pipeline-id> をクローン Hive metastore パイプラインの一意の識別子に置き換えます。 パイプライン ID は、 Lakeflow 宣言型パイプライン UI で確認できます。
    • <target-catalog-name> 新しいパイプラインを公開する Unity Catalog 内のカタログの名前。これは既存のカタログである必要があります。
    • <target-schema-name> 現在のスキーマ名と異なる場合は、新しいパイプラインを公開するUnity Catalog内のスキーマの名前に置き換えます。 これはオプションであり、指定しない場合は、既存のスキーマ名が使用されます。
    • <new-pipeline-name> 新しいパイプラインのオプションの名前。指定しない場合、新しいパイプラインの名前は、ソース パイプライン名に[UC]が追加された名前になります。
  4. スクリプトを実行します。「Databricks ノートブックの実行」を参照してください。

Python
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)

assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS

data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response

check_source_pipeline_exists()
request_pipeline_clone()

制限事項

Lakeflow 宣言型パイプライン clone a pipeline API リクエストの制限事項は次のとおりです。

  • Hive metastoreを使用するように構成されたパイプラインからUnity Catalogパイプラインへのクローン作成のみがサポートされています。

  • クローンを作成できるのは、クローン元のパイプラインと同じ Databricks ワークスペース内のみです。

  • 複製するパイプラインには、次のストリーミング ソースのみを含めることができます。

  • 複製しているHive metastoreパイプラインがAuto Loaderファイル通知モードを使用している場合、 Databricks複製後にHive metastoreパイプラインを実行しないことをお勧めします。 これは、 Hive metastoreパイプラインを実行すると、 Unity Catalogクローンから一部のファイル通知イベントがドロップされるためです。 クローン操作の完了後にソースHive metastoreラインが実行される場合は、 cloudFiles.backfillIntervalオプションを指定したAuto Loaderを使用して、不足しているファイルをバックフィルできます。 Auto Loader ファイル通知モードの詳細については、 「ファイル通知モードでの Auto Loader ストリームの構成」を参照してください。Auto Loaderを使用したファイルのバックフィルの詳細については、 「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。 および共通Auto Loaderオプション

  • クローン作成の進行中は、両方のパイプラインのパイプライン メンテナンス タスクが自動的に停止します。

  • 以下は、複製されたUnity Catalogパイプライン内のテーブルに対するタイムトラベル クエリに適用されます。

    • テーブル バージョンがもともとHive metastore管理対象オブジェクトに書き込まれていた場合、クローンされたUnity Catalogオブジェクトをクエリするときに、 timestamp_expression句を使用したタイムトラベル クエリは定義されません。
    • ただし、テーブルのバージョンがクローンされたUnity Catalogオブジェクトに書き込まれている場合、 timestamp_expression句を使用したタイムトラベル クエリは正しく機能します。
    • version句を使用した TIMETRAV クエリは、バージョンが元々 Hive metastore管理オブジェクトに書き込まれていた場合でも、複製されたUnity Catalogオブジェクトをクエリするときに正常に機能します。
  • Lakeflowで 宣言型パイプラインを使用する場合のその他の制限事項については、「Unity Catalog Unity Catalogパイプラインの制限事項 」を参照してください。

  • Unity Catalog制限については、 Unity Catalog制限」を参照してください。