メインコンテンツまでスキップ

宣言型パイプライン間でのテーブル Lakeflow 移動

この記事では、ストリーミングテーブルとマテリアライズドビューをパイプライン間で移動する方法について説明します。 移動後、元のパイプラインではなく、フローを移動したパイプラインがテーブルを更新します。これは、次のような多くのシナリオで役立ちます。

  • 大きなパイプラインを小さなパイプラインに分割します。
  • 複数のパイプラインを 1 つの大きなパイプラインに結合します。
  • パイプライン内の一部のテーブルの更新頻度を変更します。
  • 従来の公開モードを使用するパイプラインからデフォルトの公開モードにテーブルを移動します。従来の公開モードの詳細については、 「パイプラインの従来の公開モード」を参照してください。パイプライン全体の公開モードを一度に移行する方法については、 「パイプラインでデフォルトの公開モードを有効にする」を参照してください。
  • 異なるワークスペース内のパイプライン間でテーブルを移動します。

要件

パイプライン間でテーブルを移動するための要件は次のとおりです。

  • ALTER ...コマンドを実行するときは Databricks Runtime 16.3 以上を使用し、ワークスペース間のテーブル移動には Databricks Runtime 17.2 を使用する必要があります。

  • ソース パイプラインと宛先パイプラインの両方が次の条件を満たす必要があります。

    • 操作を実行するDatabricksユーザー アカウントまたはサービスプリンシパルが所有
    • メタストアを共有するワークスペース内。メタストアを確認するには、 current_metastore関数を参照してください。
  • 宛先パイプラインでは、デフォルトの公開モードを使用する必要があります。これにより、テーブルを複数のカタログとスキーマに公開できるようになります。

    または、両方のパイプラインで従来の公開モードを使用し、設定で両方のカタログとターゲット値を同じにする必要があります。従来の公開モードの詳細については、 「LIVE スキーマ (レガシー)」を参照してください。

注記

この機能では、デフォルトの公開モードを使用するパイプラインを、従来の公開モードを使用するパイプラインに移動することはサポートされていません。

パイプライン間でテーブルを移動する

次の手順では、ストリーミング テーブルまたはマテリアライズドビューをあるパイプラインから別のパイプラインに移動する方法を説明します。

  1. ソース パイプラインが実行中の場合は停止します。完全に止まるまで待ちます。

  2. ソース パイプラインのコードからテーブルの定義を削除し、将来の参照用にどこかに保存します。

    パイプラインを正しく実行するために必要なサポートクエリまたはコードを含めます。

  3. ノートブックまたは SQL エディターから次の SQL コマンドを実行して、ソース パイプラインから宛先パイプラインにテーブルを再割り当てします。

    SQL
    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
    SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");

    SQL コマンドはソース パイプラインのワークスペースから実行する必要があることに注意してください。

    コマンドは、 Unity Catalogで管理されるマテリアライズドビューとストリーミング テーブルにそれぞれALTER MATERIALIZED VIEWALTER STREAMING TABLE使用します。 Hive metastoreテーブルで同じアクションを実行するには、 ALTER TABLE使用します。

    たとえば、 salesという名前のストリーミング テーブルを ID abcd1234-ef56-ab78-cd90-1234efab5678のパイプラインに移動する場合は、次のコマンドを実行します。

    SQL
    ALTER STREAMING TABLE sales
    SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
注記

pipelineId有効なパイプライン識別子である必要があります。null値は許可されていません。

  1. テーブルの定義を宛先パイプラインのコードに追加します。
注記

カタログまたはターゲット スキーマがソースと宛先で異なる場合、クエリを正確にコピーしても機能しない可能性があります。定義内の部分的に修飾されたテーブルは、異なる方法で解決される可能性があります。テーブル名を完全に修飾するには、移動中に定義を更新する必要がある場合があります。

移動が完了しました。これで、ソース パイプラインと宛先パイプラインの両方を実行できるようになりました。宛先パイプラインはテーブルを更新します。

トラブルシューティング

次の表は、パイプライン間でテーブルを移動するときに発生する可能性のあるエラーについて説明します。

エラー

説明

DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE

ソース パイプラインはデフォルトの公開モードであり、宛先では LIVE スキーマ (レガシー) モードが使用されます。これはサポートされていません。ソースがデフォルトの公開モードを使用する場合は、宛先も同様に公開する必要があります。

PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE

LakeFlow宣言型パイプライン間でのテーブルの移動のみがサポートされています。 Databricks SQLで作成したストリーミングテーブルやマテリアライズドビューのパイプラインはサポートされていません。

DESTINATION_PIPELINE_NOT_FOUND

pipelines.pipelineIdは有効なパイプラインである必要があります。pipelineIdは null にできません。

移動後に宛先でテーブルの更新に失敗します。

この場合に迅速に軽減するには、同じ手順に従ってテーブルをソース パイプラインに戻します。

PIPELINE_PERMISSION_DENIED_NOT_OWNER

移動操作を実行するユーザーは、ソース パイプラインと宛先パイプラインの両方を所有している必要があります。

TABLE_ALREADY_EXISTS

エラー メッセージにリストされているテーブルは既に存在します。これは、パイプラインのバックアップ テーブルがすでに存在する場合に発生する可能性があります。この場合、 DROPエラーにリストされたテーブルです。

パイプライン内の複数のテーブルの例

パイプラインには複数のテーブルを含めることができます。パイプライン間で一度に 1 つのテーブルを移動することもできます。このシナリオでは、ソース パイプラインで順番に互いを読み取る 3 つのテーブル ( table_atable_btable_c ) があります。1 つのテーブルtable_bを別のパイプラインに移動します。

初期ソースパイプラインコード:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

ソースからテーブル定義をコピーして削除し、 table_bの pipelineId を更新して、 table_b別のパイプラインに移動します。

まず、スケジュールを停止し、ソースとターゲット パイプラインの両方で更新が完了するまで待ちます。 次に、ソース パイプラインを変更して、移動されるテーブルのコードを削除します。更新されたソース パイプラインのサンプル コードは次のようになります。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )

@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

SQL エディターに移動してALTER pipelineIdコマンドを実行します。

SQL
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

次に、宛先パイプラインに移動して、 table_bの定義を追加します。パイプライン設定でデフォルトのカタログとスキーマが同じ場合は、コードを変更する必要はありません。

ターゲット パイプライン コード:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

パイプライン設定でデフォルトのカタログとスキーマが異なる場合は、パイプラインのカタログとスキーマを使用して完全修飾名を追加する必要があります。

たとえば、ターゲット パイプライン コードは次のようになります。

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)

ソース パイプラインとターゲット パイプラインの両方を実行 (またはスケジュールを再度有効に) します。

パイプラインは分離されました。table_cのクエリはtable_b (現在ターゲット パイプライン内) から読み取り、 table_b table_a (ソース パイプライン内) から読み取ります。ソース パイプラインでトリガー実行を行うと、 table_bソース パイプラインによって管理されなくなるため更新されません。ソース パイプラインはtable_bパイプライン外部のテーブルとして扱います。これは、パイプラインによって管理されていないUnity CatalogのDeltaテーブルから読み取ったマテリアライズドビューを定義することに相当します。

制限事項

パイプライン間でテーブルを移動する場合の制限は次のとおりです。

  • Databricks SQLで作成されたマテリアライズドビューおよびストリーミングテーブルはサポートされていません。
  • プライベート テーブルまたはビューはサポートされていません。
  • ソース パイプラインと宛先パイプラインはパイプラインである必要があります。ヌルパイプラインはサポートされていません。
  • ソースと宛先パイプラインは、同じワークスペース内にあるか、同じメタストアを共有する別のワークスペース内にある必要があります。
  • 移動操作を実行するユーザーは、ソース パイプラインと宛先パイプラインの両方を所有している必要があります。
  • ソース パイプラインがデフォルトの公開モードを使用する場合、宛先パイプラインもデフォルトの公開モードを使用する必要があります。デフォルトの公開モードを使用するパイプラインから、LIVE スキーマ (レガシ) を使用するパイプラインにテーブルを移動することはできません。LIVE スキーマ (レガシー)を参照してください。
  • ソース パイプラインと宛先パイプラインの両方が LIVE スキーマ (レガシ) を使用している場合は、設定で同じcatalog値とtarget値を持つ必要があります。