メインコンテンツまでスキップ

宣言型パイプライン間でのテーブル Lakeflow 移動

この記事では、ストリーミングテーブルとマテリアライズドビューをパイプライン間で移動する方法について説明します。 移動後、フローの移動先のパイプラインは、元のパイプラインではなくテーブルを更新します。これは、次のような多くのシナリオで役立ちます。

  • 大きなパイプラインを小さなパイプラインに分割します。
  • 複数のパイプラインを 1 つの大きなパイプラインにマージします。
  • パイプライン内の一部のテーブルの更新頻度を変更します。
  • 従来の公開モードを使用するパイプラインからデフォルトの公開モードにテーブルを移動します。 従来の発行モードの詳細については、「 パイプラインの従来の発行モード」を参照してください。パイプライン全体の公開モードを一度に移行する方法については、「 パイプラインでデフォルト公開モードを有効にする」を参照してください。

必要条件

パイプライン間でテーブルを移動するための要件を次に示します。

  • ALTER ... コマンドを実行するときは、Databricks Runtime 16.3 以降を使用する必要があります。

  • ソース パイプラインと宛先パイプラインの両方が次の条件を満たしている必要があります。

    • 同じワークスペース内
    • 操作を実行している Databricks ユーザー アカウントまたはサービスプリンシパルが所有しています
  • 宛先パイプラインでは、デフォルト パブリッシング モードを使用する必要があります。 これにより、テーブルを複数のカタログとスキーマに公開できます。

    または、両方のパイプラインで従来の発行モードを使用する必要があり、両方の設定でカタログとターゲットの値が同じである必要があります。従来の公開モードに関する情報については、「 LIVE スキーマ (従来)」を参照してください。

注記

この機能は、デフォルトの公開モードを使用するパイプラインを従来の公開モードを使用するパイプラインに移動することをサポートしていません。

パイプライン間でテーブルを移動する

次の手順では、ストリーミングテーブルまたはマテリアライズドビューを 1 つのパイプラインから別のパイプラインに移動する方法について説明します。

  1. ソース パイプラインが実行されている場合は停止します。完全に停止するまで待ちます。

  2. ソース パイプラインのノートブックまたはファイルからテーブルの定義を削除し、後で参照できるようにどこかに保存します。

    パイプラインを正しく実行するために必要なサポート クエリまたはコードを含めます。

  3. ノートブックまたは SQL エディタから、次の SQL コマンドを実行して、ソース パイプラインからレプリケート先パイプラインにテーブルを再割り当てします。

    SQL
    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
    SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");

    このコマンドでは、Unity Catalogマネージ マテリアライズドビューとストリーミングテーブルにそれぞれ ALTER MATERIALIZED VIEWALTER STREAMING TABLE を使用します。Hive metastoreテーブルで同じアクションを実行するには、ALTER TABLEを使用します。

    たとえば、 sales という名前のストリーミングテーブルを ID が abcd1234-ef56-ab78-cd90-1234efab5678のパイプラインに移動する場合は、次のコマンドを実行します。

    SQL
    ALTER STREAMING TABLE sales
    SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
注記

pipelineIdは有効なパイプライン識別子である必要があります。null値は許可されていません。

  1. テーブルの定義を宛先パイプラインのノートブック/ファイルに追加します。
注記

カタログまたはターゲットスキーマがソースとコピー先で異なる場合、クエリを正確にコピーできない可能性があります。定義内の部分的に修飾されたテーブルは、異なる方法で解決される場合があります。移動中に定義を更新する必要がある場合があります。

移動が完了しました。これで、ソース パイプラインと宛先パイプラインの両方を実行できます。宛先パイプラインによってテーブルが更新されます。

トラブルシューティング

次の表では、パイプライン間でテーブルを移動するときに発生する可能性のあるエラーについて説明します。

エラー

説明

DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE

ソース パイプラインはデフォルトのパブリッシュ モードであり、宛先は LIVE スキーマ (レガシ) モードを使用します。これはサポートされていません。ソースがデフォルトの公開モードを使用している場合は、送信先もそうする必要があります。

PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE

宣言型パイプライン間でのテーブルの移動のみがサポートされています LakeFlow 。 Databricks SQLで作成されたストリーミングテーブルのパイプラインとマテリアライズドビューはサポートされていません。

DESTINATION_PIPELINE_NOT_FOUND

pipelines.pipelineIdは有効なパイプラインである必要があります。pipelineIdを null にすることはできません。

移動後、移動先でテーブルの更新に失敗します。

この場合をすばやく軽減するには、同じ手順に従ってテーブルをソース パイプラインに戻します。

PIPELINE_PERMISSION_DENIED_NOT_OWNER

ソース パイプラインと宛先パイプラインの両方が、移動操作を実行するユーザーが所有している必要があります。

TABLE_ALREADY_EXISTS

エラーメッセージにリストされているテーブルはすでに存在します。これは、パイプラインのバッキング テーブルが既に存在する場合に発生する可能性があります。この場合は、エラーにリストされているテーブル DROP

パイプラインに複数のテーブルがある例

パイプラインには、複数のテーブルを含めることができます。パイプライン間で一度に 1 つのテーブルを移動することはできます。このシナリオでは、ソース パイプラインで相互に順番に読み取る 3 つのテーブル (table_atable_btable_c) があります。1 つのテーブル table_bを別のパイプラインに移動します。

初期ソースパイプラインコード:

Python
import dlt
from pyspark.sql.functions import col

@dlt.table
def table_a():
return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dlt.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

ソースからテーブル定義をコピーして削除し、table_bの pipelineId を更新することで、table_bを別のパイプラインに移動します。

まず、すべてのスケジュールを停止し、ソース パイプラインとターゲット パイプラインの両方で更新が完了するのを待ちます。 次に、ソース パイプラインを変更して、移動するテーブルのコードを削除します。更新されたソース パイプラインのサンプル コードは次のようになります。

Python
import dlt
from pyspark.sql.functions import col

@dlt.table
def table_a():
return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dlt.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )

@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

SQL エディターに移動して、 ALTER pipelineId コマンドを実行します。

SQL
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

次に、宛先パイプラインに移動し、 table_bの定義を追加します。パイプライン設定でデフォルトのカタログとスキーマが同じ場合、コードを変更する必要はありません。

ターゲット パイプライン コードは次のとおりです。

Python
import dlt
from pyspark.sql.functions import col

@dlt.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

デフォルトのカタログとスキーマがパイプライン設定で異なる場合は、パイプラインのカタログとスキーマを使用して完全修飾名を追加する必要があります。

たとえば、ターゲット パイプライン コードは次のようになります。

Python
import dlt
from pyspark.sql.functions import col

@dlt.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)

ソース パイプラインとターゲット パイプラインの両方に対して実行 (またはスケジュールを再度有効に) します。

パイプラインはこれでばらばらになっています。table_c のクエリは、table_b (現在はターゲット パイプライン内) から読み取り、table_btable_a (ソース パイプライン内) から読み取ります。ソースパイプラインでトリガー実行を実行すると、ソースパイプラインによって管理されなくなったため、 table_b は更新されません。ソースパイプラインは、 table_b パイプラインの外部のテーブルとして扱います。これは、パイプラインによって管理されていないUnity CatalogのDeltaテーブルから読み取ったマテリアライズドビューを定義するのと似ています。

制限

パイプライン間でテーブルを移動する場合の制限事項を次に示します。

  • Databricks SQLで作成されたマテリアライズドビューとストリーミングテーブルはサポートされていません。
  • プライベート・テーブルまたはビューはサポートされていません。
  • ソース パイプラインと宛先パイプラインはパイプラインである必要があります。null パイプラインはサポートされていません。
  • ソース パイプラインと宛先パイプラインの両方が同じワークスペースに存在する必要があります。
  • ソース パイプラインと destination パイプラインはどちらも、移動操作を実行しているユーザーが所有している必要があります。
  • ソース パイプラインがデフォルト パブリッシング モードを使用する場合、デスティネーション パイプラインもデフォルト パブリッシング モードを使用している必要があります。 デフォルトの発行モードを使用するパイプラインから、LIVE スキーマ (レガシ) を使用するパイプラインにテーブルを移動することはできません。LIVE スキーマ (レガシー)を参照してください。
  • ソース パイプラインと宛先パイプラインの両方が LIVE スキーマ (レガシ) を使用している場合は、設定で同じ catalog 値と target 値を持つ必要があります。