Unity Catalog を Lakeflow 宣言型パイプラインと共に使用する
Databricks 、 Unity Catalogを使用してLakeFlow宣言型パイプラインを構成することをお勧めします。 新しく作成されたパイプラインでは、Unity Catalog の使用がデフォルトになります。
Unity Catalog で構成されたパイプラインは、定義されたすべてのマテリアライズドビューとストリーミング テーブルを、指定したカタログとスキーマに発行します。 Unity Catalog パイプラインは、他の Unity Catalog テーブルとボリュームから読み取ることができます。
Unity Catalog パイプラインによって作成されたテーブルの権限を管理するには、 GRANT と REVOKE を使用します。
この記事では、パイプラインの現在のデフォルトの公開モードの機能について説明します。2025 年 2 月 5 日より前に作成されたパイプラインでは、従来の公開モードとLIVE
仮想スキーマが使用される可能性があります。LIVE スキーマ (レガシー)を参照してください。
要件
Unity Catalogのターゲット スキーマにストリーミング テーブルとマテリアライズドビューを作成するには、スキーマと親カタログに対して次の権限が必要です。
USE CATALOG
ターゲット カタログに対する権限。CREATE MATERIALIZED VIEW
パイプラインがマテリアライズドビューを作成する場合は、ターゲット スキーマに対するUSE SCHEMA
権限。CREATE TABLE
パイプラインがストリーミング テーブルを作成する場合は、ターゲット スキーマに対するUSE SCHEMA
権限。- パイプラインで新しいスキーマを作成する場合は、ターゲット カタログに対する
USE CATALOG
権限とCREATE SCHEMA
権限が必要です。
Unity カタログ対応パイプラインを実行するためのコンピュート要件:
- コンピュート リソースは標準アクセス モードで構成する必要があります。 専用コンピュートには対応しておりません。 「アクセス モード」を参照してください。
を使用して 宣言型パイプラインによって作成されたテーブル (ストリーミングテーブルやマテリアライズドビューなど) クエリを実行するために必要なコンピュートには、次のいずれかが含まれます。LakeflowUnity Catalog
- SQLウェアハウス
- Databricks Runtime 13.3 LTS以降の標準アクセス モード コンピュート。
- 専用アクセス モード コンピュート (専用コンピュートで詳細なアクセス制御が有効になっている場合) (つまり、専用コンピュートがDatabricks Runtime 15.4 以降で実行されており、ワークスペースに対してサーバーレス コンピュートが有効になっている場合)。 詳細については、専用コンピュートのきめ細かいアクセス制御を参照してください。
- 13.3 LTSから 15.3 までの専用アクセス モード コンピュートは、テーブル所有者がクエリを実行する場合のみです。
追加のコンピュート制限が適用されます。 次のセクションを参照してください。
制限事項
Unity Catalog を 宣言型パイプラインと共に使用する場合Lakeflow制限事項は次のとおりです。
-
安心により、パイプラインの所有者とワークスペース管理者だけが、Unity カタログ対応のパイプラインを実行するコンピュートからドライバーのログを表示できます。 他のユーザーがドライバー ログにアクセスできるようにするには、 「管理者以外のユーザーが Unity Catalog 対応パイプラインからドライバー ログを表示できるようにする」を参照してください。
-
Hive metastoreを使用する既存のパイプラインは、 Unity Catalogを使用するようにアップグレードできません。 Hive metastoreに書き込む既存のパイプラインを移行するには、新しいパイプラインを作成し、データ ソースからデータを再取り込む必要があります。 Hive metastoreラインを複製してUnity Catalogパイプラインを作成する」を参照してください。
-
Unity Catalogパブリック プレビュー中に作成されたメタストアに接続されたワークスペースに、 Unity Catalog対応のパイプラインを作成することはできません。 「権限継承へのアップグレード」を参照してください。
-
JAR はサポートされていません。サードパーティの Python ライブラリのみがサポートされています。「宣言型パイプラインのPython依存関係の管理」Lakeflowを参照してください。
-
ストリーミング テーブルのスキーマを変更するデータ操作言語 (DML) クエリはサポートされていません。
-
パイプライン内で作成されたマテリアライズドビューは、そのパイプラインの外(別のパイプラインや下流のノートブックなど)でストリーミング ソースとして使用することはできません。
-
マテリアライズドビューとストリーミングテーブルのデータは、それを含むスキーマの保存場所に保存されます。 スキーマの保存場所が指定されていない場合、テーブルはカタログの保存場所に保存されます。スキーマとカタログの保存場所が指定されていない場合、テーブルはメタストアのルート保存場所に保存されます。
-
カタログ エクスプローラーの 履歴 タブには、マテリアライズドビューの履歴は表示されません。
-
テーブルを定義するときに、
LOCATION
プロパティはサポートされません。 -
Unity Catalog対応パイプラインは Hive metastoreに発行できません。
-
Python UDF サポートはパブリック プレビュー段階です。
マテリアライズドビューをサポートする基になるファイルには、マテリアライズドビューの定義に表示されないアップストリームテーブルのデータ (個人を特定できる可能性のある情報を含む) が含まれる場合があります。 このデータは、マテリアライズドビューの増分更新をサポートするために、基になるストレージに自動的に追加されます。
マテリアライズドビューの基になるファイルは、マテリアライズドビュー スキーマの一部ではないアップストリーム テーブルからのデータを公開するリスクがあるため、Databricks では、基になるストレージを信頼されていないダウンストリーム コンシューマーと共有しないことをお勧めします。
たとえば、マテリアライズドビュー定義にCOUNT(DISTINCT field_a)
句が含まれているとします。 マテリアライズドビュー定義には集計COUNT DISTINCT
句のみが含まれていますが、基礎となるファイルにはfield_a
の実際の値のリストが含まれます。
Hive metastoreとUnity Catalogパイプラインを併用できますか?
ワークスペースにはUnity Catalogと従来のHive metastoreを使用するパイプラインを含めることができます。 ただし、単一のパイプラインはHive metastoreとUnity Catalogに書き込むことはできません。 Hive metastoreに書き込む既存のパイプラインは、 Unity Catalogを使用するようにアップグレードできません。 Hive metastoreに書き込む既存のパイプラインを移行するには、新しいパイプラインを作成し、データ ソースからデータを再取り込む必要があります。 Hive metastoreラインを複製してUnity Catalogパイプラインを作成する」を参照してください。
Unity Catalog を使用していない既存のパイプラインは、Unity Catalog で構成された新しいパイプラインを作成しても影響を受けません。これらのパイプラインは、構成されたストレージの場所を使用してHive metastoreにデータを保持し続けます。
このドキュメントで特に指定がない限り、既存のすべてのデータソースと Lakeflow 宣言型パイプライン機能は、 Unity Catalog. Python インターフェイスと SQL インターフェイスはどちらも、Unity Catalog を使用するパイプラインでサポートされています。
非アクティブなテーブル
LakeFlow宣言型パイプラインがデータをUnity Catalogに永続化するように構成されている場合、パイプラインはテーブルのライフサイクルと権限を管理します。 テーブルの定義がパイプラインから削除されると、テーブルは非アクティブになる可能性があり、パイプラインが削除されるとテーブルも削除されます。
パイプライン ソースからテーブル定義を削除すると、次回のパイプライン更新では、対応するマテリアライズドビューまたはストリーミング テーブル エントリが非アクティブとしてマークされます。 非アクティブなテーブルに対してクエリを実行することは引き続き可能ですが、パイプラインはそれらのテーブルを更新しなくなります。マテリアライズドビューまたはストリーミング テーブルをクリーンアップするには、テーブルを明示的にDROP
ます。
UNDROP
コマンドを使用すると、削除されたテーブルを 7 日以内に回復できます。- 次回のパイプライン更新時にマテリアライズドビューまたはストリーミング テーブル エントリがUnity Catalogから削除される従来の動作を維持するには、パイプライン構成
"pipelines.dropInactiveTables": "true"
を設定します。 実際のデータは、誤って削除した場合でも復元できるように一定期間保持されます。マテリアライズドビューまたはストリーミングテーブルをパイプライン定義に追加し直すことで、7 日以内にデータを復元できます。
パイプラインを完全に削除すると (パイプライン ソースからテーブル定義を削除するのではなく)、そのパイプラインで定義されているすべてのテーブルも削除されます。LakeFlow宣言型パイプライン UI では、パイプラインの削除を確認するように求められます。
宣言型パイプラインから Unity Catalog するテーブル Lakeflow 書き込みます
テーブルを Unity Catalog に書き込むには、ワークスペースを介してパイプラインを操作するように構成する必要があります。パイプラインを作成するときは、[ ストレージ オプション] の Unity Catalog を選択し、 [カタログ] ドロップダウン メニューでカタログを選択して、 [ターゲット スキーマ] ドロップ ダウン メニューで既存のスキーマを選択するか、新しいスキーマの名前を入力します。 Unity Catalog カタログの詳細については、 「Databricks のカタログとは」を参照してください。Unity Catalog のスキーマの詳細については、 「Databricks のスキーマとは」を参照してください。
Unity Catalog パイプラインにデータを取り込む
Unity Catalog を使用するように構成されたパイプラインは、以下からデータを読み取ることができます。
- Unity Catalog マネージドテーブルと外部テーブル、ビュー、マテリアライズドビュー、ストリーミングテーブルがあります。
- Hive metastoreテーブルとビュー。
read_files()
関数を使用してUnity Catalog外部位置から読み取るAuto Loader 。- Apache Kafka と Amazon Kinesis。
以下はUnity CatalogおよびHive metastoreテーブルからの読み取りの例です。
Unity Catalogテーブルからのバッチ取り込み
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW
table_name
AS SELECT
*
FROM
my_catalog.my_schema.table1;
@dp.materialized_view
def table_name():
return spark.read.table("my_catalog.my_schema.table")
Unity Catalogテーブルから変更をストリームする
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE
table_name
AS SELECT
*
FROM
STREAM(my_catalog.my_schema.table1);
@dp.table
def table_name():
return spark.readStream.table("my_catalog.my_schema.table")
Hive metastoreからデータを取り込む
Unity Catalogを使用するパイプラインは、 hive_metastore
カタログを使用してHive metastoreテーブルからデータを読み取ることができます。
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW
table_name
AS SELECT
*
FROM
hive_metastore.some_schema.table;
@dp.materialized_view
def table3():
return spark.read.table("hive_metastore.some_schema.table")
Auto Loaderからデータを取り込む
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"/path/to/uc/external/location",
format => "json"
)
@dp.table(table_properties={"quality": "bronze"})
def table_name():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(f"{path_to_uc_external_location}")
)
マテリアライズドビューを共有する
デフォルトでは、パイプラインによって作成されたデータセットをクエリする権限を持つのはパイプラインの所有者のみです。GRANTステートメントを使用して他のユーザーにテーブルをクエリする権限を与え、 REVOKEステートメントを使用してクエリ アクセスを取り消すことができます。Unity Catalogの権限の詳細については、 Unity Catalogでの権限の管理」を参照してください。
テーブルに対する選択権限の付与
GRANT SELECT ON TABLE
my_catalog.my_schema.table_name
TO
`user@databricks.com`
テーブルの選択を取り消す
REVOKE SELECT ON TABLE
my_catalog.my_schema.table_name
FROM
`user@databricks.com`
テーブル作成権限またはマテリアライズドビュー作成権限を付与する
GRANT CREATE { MATERIALIZED VIEW | TABLE } ON SCHEMA
my_catalog.my_schema
TO
{ principal | user }
パイプラインのリネージを見る
Lakeflow 宣言型パイプラインのテーブルのリネージは、Catalog Explorer に表示されます。Catalog Explorer リネージ UI には、Unity Catalog 対応パイプラインのマテリアライズドビュー またはストリーミングテーブルのアップストリーム テーブルとダウンストリーム テーブルが表示されます。 Unity Catalogリネージの詳細については、「Unity Catalogを使用してデータリネージを表示する」を参照してください。
Unity カタログ対応パイプライン内のマテリアライズドビューまたはストリーミングテーブルの場合、現在のワークスペースからパイプラインにアクセスできる場合、カタログエクスプローラーのリネージ UI はマテリアライズドビューまたはストリーミングテーブルを生成したパイプラインにもリンクします。
ストリーミング テーブルのデータを追加、変更、または削除する
挿入、更新、削除、マージ ステートメントなどのデータ操作言語(DML) ステートメントを使用して、 Unity Catalogに公開されたストリーミング テーブルを変更できます。 ストリーミング テーブルに対する DML クエリのサポートにより、EU 一般データ保護規則 ( GDPR ) に基づくコンプライアンスのテーブル更新などのユースケースが可能になります。
- ストリーミングテーブルのテーブルスキーマを変更する DML ステートメントはサポートされていません。DML ステートメントによってテーブルスキーマの進化が発生しないようにしてください。
- ストリーミングテーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以上を使用する共有Unity Catalog クラスターまたはSQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータ ソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルの読み取り時にSkipChangeCommits フラグを設定します。
skipChangeCommits
が設定されている場合、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。処理にストリーミング テーブルが必要ない場合は、ターゲット テーブルとしてマテリアライズドビュー (追加のみの制限がない) を使用できます。
以下は、ストリーミング テーブル内のレコードを変更する DML ステートメントの例です。
特定の ID を持つレコードを削除します。
DELETE FROM my_streaming_table WHERE id = 123;
特定の ID を持つレコードを更新します。
UPDATE my_streaming_table SET name = 'Jane Doe' WHERE id = 123;
行フィルターと列マスクを使用してテーブルを公開する
プレビュー
この機能は パブリック プレビュー段階です。
行フィルターを使用すると、テーブルスキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。これらのフィルターにより、後続のクエリでは、フィルター述語が true と評価される行のみが返されるようになります。
列マスクを使用すると、テーブルスキャンで行がフェッチされるたびに列の値をマスクできます。その列に対する今後のクエリでは、列の元の値ではなく、評価された関数の結果が返されます。行フィルターと列マスクの使用の詳細については、 「行フィルターと列マスク」を参照してください。
行フィルターと列マスクの管理
マテリアライズドビューとストリーミング テーブルの行フィルターと列マスクは、 CREATE OR REFRESH
ステートメントを通じて追加、更新、または削除する必要があります。
行フィルタと列マスクを使用してテーブルを定義する詳細な構文については、「 Lakeflow 宣言型パイプライン SQL 言語リファレンス 」および「 Lakeflow 宣言型パイプライン Python 言語リファレンス」を参照してください。
行動
以下は、宣言型パイプラインで行フィルターまたは列マスクを使用する場合の重要な詳細 Lakeflow 示します。
- 所有者として更新 : パイプラインがマテリアライズドビューまたはストリーミングテーブルを更新すると、行フィルターおよび列マスク機能がパイプライン所有者の権限で実行されます。 つまり、テーブルの更新では、パイプラインを作成したユーザーのセキュリティ コンテキストが使用されることになります。ユーザー コンテキストをチェックする関数 (
CURRENT_USER
やIS_MEMBER
など) は、パイプライン所有者のユーザー コンテキストを使用して評価されます。 - Query : マテリアライズドビューまたはストリーミング テーブルをクエリする場合、ユーザー コンテキスト (
CURRENT_USER
やIS_MEMBER
など) をチェックする関数は、呼び出し元のユーザー コンテキストを使用して評価されます。 このアプローチでは、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。 - 行フィルターと列マスクを含むソース テーブルに対してマテリアライズドビューを作成する場合、マテリアライズドビューの更新は常に完全な更新になります。 完全更新では、ソースで使用可能なすべてのデータが最新の定義で再処理されます。このプロセスでは、ソース テーブルのセキュリティ ポリシーが評価され、最新のデータと定義を使用して適用されているかどうかを確認します。
可観測性
DESCRIBE EXTENDED
、 INFORMATION_SCHEMA
、またはカタログ エクスプローラーを使用して、特定のマテリアライズドビューまたはストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはマテリアライズドビューとストリーミング テーブルでのデータ アクセスと保護対策を監査およびレビューできます。