宣言型パイプライン間でのテーブル Lakeflow 移動
この記事では、ストリーミングテーブルとマテリアライズドビューをパイプライン間で移動する方法について説明します。 移動後、元のパイプラインではなく、フローを移動したパイプラインがテーブルを更新します。これは、次のような多くのシナリオで役立ちます。
- 大きなパイプラインを小さなパイプラインに分割します。
- 複数のパイプラインを 1 つの大きなパイプラインに結合します。
- パイプライン内の一部のテーブルの更新頻度を変更します。
- 従来の公開モードを使用するパイプラインからデフォルトの公開モードにテーブルを移動します。従来の公開モードの詳細については、 「パイプラインの従来の公開モード」を参照してください。パイプライン全体の公開モードを一度に移行する方法については、 「パイプラインでデフォルトの公開モードを有効にする」を参照してください。
- 異なるワークスペース内のパイプライン間でテーブルを移動します。
要件
パイプライン間でテーブルを移動するための要件は次のとおりです。
-
ALTER ...
コマンドを実行するときは Databricks Runtime 16.3 以上を使用し、ワークスペース間のテーブル移動には Databricks Runtime 17.2 を使用する必要があります。 -
ソース パイプラインと宛先パイプラインの両方が次の条件を満たす必要があります。
- 操作を実行するDatabricksユーザー アカウントまたはサービスプリンシパルが所有
- メタストアを共有するワークスペース内。メタストアを確認するには、
current_metastore
関数を参照してください。
-
宛先パイプラインでは、デフォルトの公開モードを使用する必要があります。これにより、テーブルを複数のカタログとスキーマに公開できるようになります。
または、両方のパイプラインで従来の公開モードを使用し、設定で両方のカタログとターゲット値を同じにする必要があります。従来の公開モードの詳細については、 「LIVE スキーマ (レガシー)」を参照してください。
この機能では、デフォルトの公開モードを使用するパイプラインを、従来の公開モードを使用するパイプラインに移動することはサポートされていません。
パイプライン間でテーブルを移動する
次の手順では、ストリーミング テーブルまたはマテリアライズドビューをあるパイプラインから別のパイプラインに移動する方法を説明します。
-
ソース パイプラインが実行中の場合は停止します。完全に止まるまで待ちます。
-
ソース パイプラインのコードからテーブルの定義を削除し、将来の参照用にどこかに保存します。
パイプラインを正しく実行するために必要なサポートクエリまたはコードを含めます。
-
ノートブックまたは SQL エディターから次の SQL コマンドを実行して、ソース パイプラインから宛先パイプラインにテーブルを再割り当てします。
SQLALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");SQL コマンドはソース パイプラインのワークスペースから実行する必要があることに注意してください。
コマンドは、 Unity Catalogで管理されるマテリアライズドビューとストリーミング テーブルにそれぞれ
ALTER MATERIALIZED VIEW
とALTER STREAMING TABLE
使用します。 Hive metastoreテーブルで同じアクションを実行するには、ALTER TABLE
使用します。たとえば、
sales
という名前のストリーミング テーブルを IDabcd1234-ef56-ab78-cd90-1234efab5678
のパイプラインに移動する場合は、次のコマンドを実行します。SQLALTER STREAMING TABLE sales
SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
pipelineId
有効なパイプライン識別子である必要があります。null
値は許可されていません。
- テーブルの定義を宛先パイプラインのコードに追加します。
カタログまたはターゲット スキーマがソースと宛先で異なる場合、クエリを正確にコピーしても機能しない可能性があります。定義内の部分的に修飾されたテーブルは、異なる方法で解決される可能性があります。テーブル名を完全に修飾するには、移動中に定義を更新する必要がある場合があります。
移動が完了しました。これで、ソース パイプラインと宛先パイプラインの両方を実行できるようになりました。宛先パイプラインはテーブルを更新します。
トラブルシューティング
次の表は、パイプライン間でテーブルを移動するときに発生する可能性のあるエラーについて説明します。
エラー | 説明 |
---|---|
| ソース パイプラインはデフォルトの公開モードであり、宛先では LIVE スキーマ (レガシー) モードが使用されます。これはサポートされていません。ソースがデフォルトの公開モードを使用する場合は、宛先も同様に公開する必要があります。 |
| LakeFlow宣言型パイプライン間でのテーブルの移動のみがサポートされています。 Databricks SQLで作成したストリーミングテーブルやマテリアライズドビューのパイプラインはサポートされていません。 |
|
|
移動後に宛先でテーブルの更新に失敗します。 | この場合に迅速に軽減するには、同じ手順に従ってテーブルをソース パイプラインに戻します。 |
| 移動操作を実行するユーザーは、ソース パイプラインと宛先パイプラインの両方を所有している必要があります。 |
| エラー メッセージにリストされているテーブルは既に存在します。これは、パイプラインのバックアップ テーブルがすでに存在する場合に発生する可能性があります。この場合、 |
パイプライン内の複数のテーブルの例
パイプラインには複数のテーブルを含めることができます。パイプライン間で一度に 1 つのテーブルを移動することもできます。このシナリオでは、ソース パイプラインで順番に互いを読み取る 3 つのテーブル ( table_a
、 table_b
、 table_c
) があります。1 つのテーブルtable_b
を別のパイプラインに移動します。
初期ソースパイプラインコード:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
ソースからテーブル定義をコピーして削除し、 table_b
の pipelineId を更新して、 table_b
別のパイプラインに移動します。
まず、スケジュールを停止し、ソースとターゲット パイプラインの両方で更新が完了するまで待ちます。 次に、ソース パイプラインを変更して、移動されるテーブルのコードを削除します。更新されたソース パイプラインのサンプル コードは次のようになります。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
SQL エディターに移動してALTER pipelineId
コマンドを実行します。
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
次に、宛先パイプラインに移動して、 table_b
の定義を追加します。パイプライン設定でデフォルトのカタログとスキーマが同じ場合は、コードを変更する必要はありません。
ターゲット パイプライン コード:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
パイプライン設定でデフォルトのカタログとスキーマが異なる場合は、パイプラインのカタログとスキーマを使用して完全修飾名を追加する必要があります。
たとえば、ターゲット パイプライン コードは次のようになります。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
ソース パイプラインとターゲット パイプラインの両方を実行 (またはスケジュールを再度有効に) します。
パイプラインは分離されました。table_c
のクエリはtable_b
(現在ターゲット パイプライン内) から読み取り、 table_b
table_a
(ソース パイプライン内) から読み取ります。ソース パイプラインでトリガー実行を行うと、 table_b
ソース パイプラインによって管理されなくなるため更新されません。ソース パイプラインはtable_b
パイプライン外部のテーブルとして扱います。これは、パイプラインによって管理されていないUnity CatalogのDeltaテーブルから読み取ったマテリアライズドビューを定義することに相当します。
制限事項
パイプライン間でテーブルを移動する場合の制限は次のとおりです。
- Databricks SQLで作成されたマテリアライズドビューおよびストリーミングテーブルはサポートされていません。
- プライベート テーブルまたはビューはサポートされていません。
- ソース パイプラインと宛先パイプラインはパイプラインである必要があります。ヌルパイプラインはサポートされていません。
- ソースと宛先パイプラインは、同じワークスペース内にあるか、同じメタストアを共有する別のワークスペース内にある必要があります。
- 移動操作を実行するユーザーは、ソース パイプラインと宛先パイプラインの両方を所有している必要があります。
- ソース パイプラインがデフォルトの公開モードを使用する場合、宛先パイプラインもデフォルトの公開モードを使用する必要があります。デフォルトの公開モードを使用するパイプラインから、LIVE スキーマ (レガシ) を使用するパイプラインにテーブルを移動することはできません。LIVE スキーマ (レガシー)を参照してください。
- ソース パイプラインと宛先パイプラインの両方が LIVE スキーマ (レガシ) を使用している場合は、設定で同じ
catalog
値とtarget
値を持つ必要があります。