Unity Catalogで特徴量テーブルを使用する
このページでは、 Unity Catalogで特徴量テーブルを作成および操作する方法について説明します。
このページは、Unity Catalog が有効になっているワークスペースにのみ適用されます。 ワークスペースで Unity Catalogが有効になっていない場合は、「 ワークスペース Feature Store (legacy) で特徴量テーブルを操作する」を参照してください。
このページの例で使用されているコマンドとパラメーターの詳細については、 Feature Engineering Python API リファレンスを参照してください。
必要条件
Unity Catalog での特徴量エンジニアリングにはDatabricks Runtime 13.2 以上が必要です。
Unity Catalogでの特徴量エンジニアリング Pythonクライアントのインストール
Unity Catalogでの特徴量エンジニアリングには、 Python クライアントFeatureEngineeringClient
があります。 このクラスは、 databricks-feature-engineering
パッケージを使用して PyPI で利用でき、Databricks Runtime 13.3 LTS ML 以降にプレインストールされています。 非 ML Databricks Runtimeを使用する場合は、クライアントを手動でインストールする必要があります。 互換性マトリックスを使用して、Databricks Runtime バージョンの正しいバージョンを見つけます。
%pip install databricks-feature-engineering
dbutils.library.restartPython()
Unity Catalogで特徴量テーブルのカタログとスキーマを作成する
新しい カタログ を作成するか、特徴量テーブルの既存のカタログを使用する必要があります。
新しいカタログを作成するには、メタストアに対するCREATE CATALOG
権限が必要です。
CREATE CATALOG IF NOT EXISTS <catalog-name>
既存のカタログを使用するには、カタログに対する USE CATALOG
権限が必要です。
USE CATALOG <catalog-name>
Unity Catalog の特徴量テーブルは、スキーマに格納する必要があります。カタログに新しいスキーマを作成するには、カタログに対する CREATE SCHEMA
権限が必要です。
CREATE SCHEMA IF NOT EXISTS <schema-name>
Unity Catalogで特徴量テーブルを作成する
Unity Catalogでは、主キー制約を含む既存のDeltaテーブルを特徴量テーブルとして使用できます。テーブルに主キーが定義されていない場合は、 ALTER TABLE
DDL ステートメントを使用してテーブルを更新し、制約を追加する必要があります。 Unity Catalogの既存のDeltaテーブルを特徴量テーブルとして使用する を参照してください。
ただし、DLT パイプラインによって Unity Catalog に発行されたストリーミングテーブルまたはマテリアライズドビューに主キーを追加するには、ストリーミングテーブルまたはマテリアライズドビューの定義のスキーマを変更して主キーを含め、ストリーミングテーブルまたはマテリアライズドビューを更新する必要があります。 「DLT パイプラインによって作成されたストリーミングテーブルまたはマテリアライズドビューを特徴量テーブルとして使用する」を参照してください。
Unity Catalogの特徴量テーブルはDeltaテーブルです。特徴量テーブルには主キーが必要です。 特徴量テーブルは、 Unity Catalogの他のデータ資産と同様に、3 つのレベルの名前空間 <catalog-name>.<schema-name>.<table-name>
を使用してアクセスされます。
Databricks SQL、Python FeatureEngineeringClient
、または DLT パイプラインを使用して、Unity Catalogで特徴量テーブルを作成できます。
- Databricks SQL
- Python
You can use any Delta table with a primary key constraint as a feature table. The following code shows how to create a table with a primary key:
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
);
To create a time series feature table, add a time column as a primary key column and specify the TIMESERIES keyword. The TIMESERIES keyword requires Databricks Runtime 13.3 LTS or above.
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
);
After the table is created, you can write data to it like other Delta tables, and it can be used as a feature table.
For details about the commands and parameters used in the following examples, see the Feature Engineering Python API reference.
- Write the Python functions to compute the features. The output of each function should be an Apache Spark DataFrame with a unique primary key. The primary key can consist of one or more columns.
- Create a feature table by instantiating a
FeatureEngineeringClient
and usingcreate_table
. - Populate the feature table using
write_table
.
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# Prepare feature DataFrame
def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass
customer_features_df = compute_customer_features(df)
# Create feature table with `customer_id` as the primary key.
# Take schema from DataFrame output by compute_customer_features
customer_feature_table = fe.create_table(
name='ml.recommender_system.customer_features',
primary_keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)
# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.
# customer_feature_table = fe.create_table(
# ...
# df=customer_features_df,
# ...
# )
# To use a composite primary key, pass all primary key columns in the create_table call
# customer_feature_table = fe.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# ...
# )
# To create a time series table, set the timeseries_columns argument
# customer_feature_table = fe.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# timeseries_columns='date',
# ...
# )
DLT パイプラインを使用して Unity Catalog で特徴量テーブルを作成する
テーブル制約の DLT サポートは パブリック プレビュー段階です。次のコード例は、DLT preview チャンネルを使用して実行する必要があります。
DLT パイプラインからパブリッシュされたテーブルで、 主キー制約 を含むものは、特徴量テーブルとして使用できます。 主キーを使用して DLT パイプラインにテーブルを作成するには、Databricks SQL または DLT Python プログラミング インターフェイスを使用できます。
主キーを持つ DLT パイプラインにテーブルを作成するには、次の構文を使用します。
- Databricks SQL
- Python
CREATE LIVE TABLE customer_features (
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
) AS SELECT * FROM ...;
import dlt
@dlt.table(
schema="""
customer_id int NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id)
""")
def customer_features():
return ...
時系列特徴量テーブルを作成するには、時間列を主キー列として追加し、TIMESERIES キーワードを指定します。
- Databricks SQL
- Python
CREATE LIVE TABLE customer_features (
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
) AS SELECT * FROM ...;
import dlt
@dlt.table(
schema="""
customer_id int NOT NULL,
ts timestamp NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (customer_id, ts TIMESERIES)
""")
def customer_features():
return ...
テーブルが作成されたら、他のDLTデータセットと同様にデータを書き込むことができ、特徴量テーブルとして使用することができます。
Unity Catalogの既存Deltaテーブルを特徴量テーブルとして使用する
Unity Catalog内の主キーを持つDeltaテーブルは、 Unity Catalog内の特徴量テーブルにすることができ、そのテーブルで特徴量UI とAPI を使用できます。
- プライマリ・キー制約を宣言できるのは、テーブルの所有者だけです。 オーナーの名前は、カタログエクスプローラのテーブル詳細ページに表示されます。
- Delta テーブルのデータ型が Unity Catalogでの特徴量エンジニアリングでサポートされていることを確認します。サポートされているデータ型を参照してください。
- TIMESERIES キーワードには、Databricks Runtime 13.3 LTS 以降が必要です。
既存の Delta テーブルに 主キー制約がない場合は、次のように作成できます。
-
主キーカラムを
NOT NULL
に設定します。 主キーカラムごとに、次のコマンドを実行します。SQLALTER TABLE <full_table_name> ALTER COLUMN <pk_col_name> SET NOT NULL
-
テーブルを変更して、プライマリ・キー制約を追加します。
SQLALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1, pk_col2, ...)
pk_name
は主キー制約の名前です。 慣例により、テーブル名(スキーマとカタログは除く)に_pk
というサフィックスを付けて使用できます。 たとえば、"ml.recommender_system.customer_features"
という名前のテーブルは、主キー制約の名前としてcustomer_features_pk
になります。テーブルを 時系列 特徴量テーブルにするには、次のように、主キー列の 1 つに TIMESERIES キーワードを指定します。
SQLALTER TABLE <full_table_name> ADD CONSTRAINT <pk_name> PRIMARY KEY(pk_col1 TIMESERIES, pk_col2, ...)
テーブルにプライマリキー制約を追加すると、テーブルがフィーチャー UI に表示され、特徴量テーブルとして使用できます。
DLT パイプラインによって作成されたストリーミングテーブルまたはマテリアライズドビューを特徴量テーブルとして使用します
プライマリキーを持つ Unity Catalog のストリーミングテーブルまたは実体化ビュー は、 Unity Catalogの特徴量テーブルにすることができ、フィーチャー UI を使用してテーブルを API できます。
- テーブル制約の DLT サポートは パブリック プレビュー段階です。次のコード例は、DLT preview チャンネルを使用して実行する必要があります。
- プライマリ・キー制約を宣言できるのは、テーブルの所有者だけです。 オーナーの名前は、カタログエクスプローラのテーブル詳細ページに表示されます。
- Unity Catalogでの特徴量エンジニアリングがDeltaテーブルのデータ型をサポートしていることを確認します。サポートされているデータ型を参照してください。
既存のストリーミング テーブルまたは具体化されたビューのプライマリ・キーを設定するには、オブジェクトを管理するノートブックでストリーミング・テーブルまたは具体化されたビューのスキーマを更新します。 次に、 テーブルを更新して Unity Catalog オブジェクトを更新します。
次に、マテリアライズドビューに主キーを追加する構文を示します。
- Databricks SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW existing_live_table(
id int NOT NULL PRIMARY KEY,
...
) AS SELECT ...
import dlt
@dlt.table(
schema="""
id int NOT NULL PRIMARY KEY,
...
"""
)
def existing_live_table():
return ...
Unity Catalogの既存のビューを特徴量テーブルとして使用する
ビューを特徴量テーブルとして使用するには、 Databricks Runtime 16.0 MLに組み込まれている 0.7.0 以降のバージョンを使用する必要がありますdatabricks-feature-engineering
。
Unity Catalog の単純な SELECT ビューは、 Unity Catalogの特徴量テーブルになることができ、テーブルで Features API を使用できます。
- Unity Catalogの単一のDeltaテーブルから作成されるビューとして、特徴量テーブルとして使用できる単純な SELECTビューを定義でき、そのテーブルの主キーはJOIN、GROUP BY、または DISTINCT 句なしで選択されます。SQL 文で使用できるキーワードは、SELECT、FROM、WHERE、ORDER BY、LIMIT、および OFFSET です。
- サポートされているデータ型については、 サポートされているデータ型 を参照してください。
- ビューから構成された特徴量テーブルは、特徴量 UI に表示されません。
- ソース Delta テーブルで列の名前を変更する場合は、ビュー定義の SELECT ステートメントの列の名前を一致させる必要があります。
以下は、特徴量テーブルとして使用できる単純な SELECT ビューの例です。
CREATE OR REPLACE VIEW ml.recommender_system.content_recommendation_subset AS
SELECT
user_id,
content_id,
user_age,
user_gender,
content_genre,
content_release_year,
user_content_watch_duration,
user_content_like_dislike_ratio
FROM
ml.recommender_system.content_recommendations_features
WHERE
user_age BETWEEN 18 AND 35
AND content_genre IN ('Drama', 'Comedy', 'Action')
AND content_release_year >= 2010
AND user_content_watch_duration > 60;
ビューに基づく特徴量テーブルは、オフラインモデルのトレーニングと評価に使用できます。 オンラインストアに公開することはできません。 これらのテーブルの特徴量と、これらの特徴量に基づくモデルをサービングすることはできません。
Unity Catalog で特徴量テーブルを更新する
Unity Catalog で特徴量テーブルを更新するには、新しい特徴量テーブルを追加するか、主キーに基づいて特定の行を変更します。
次の特徴量テーブルのメタデータは更新しないでください。
- 主キー。
- パーティション キー。
- 既存の特徴量の名前やデータ タイプ。
これらを変更すると、モデルのトレーニングとサービングに特徴量を使用するダウンストリーム パイプラインが壊れます。
Unity Catalog の既存の特徴量テーブルに新しい特徴量テーブルを追加する
既存の特徴量テーブルに新しい特徴量を追加するには、次の 2 つの方法があります。
- 既存の特徴量計算関数を更新し、返された DataFrameで
write_table
を実行します。 これにより、特徴量テーブル スキーマが更新され、主キーに基づいて新しい特徴量テーブル値がマージされます。 - 新しい特徴量を計算するための新しい特徴量計算関数を作成します。 この新しい計算関数によって返される DataFrame には、特徴量テーブルの主キーとパーティション キー (定義されている場合) が含まれている必要があります。 DataFrameで
write_table
を実行して、同じ主キーを使用して新しい特徴量テーブルに新しい特徴を書き込むことができます。
特徴量テーブルの特定の行のみを更新する
write_table
でmode = "merge"
を使用します。write_table
呼び出しで送信された DataFrame に主キーが存在しない行は変更されません。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.write_table(
name='ml.recommender_system.customer_features',
df = customer_features_df,
mode = 'merge'
)
特徴量テーブルを更新するジョブのスケジュール
特徴量テーブルの特徴量テーブルに常に最新の値が含まれるようにするために、 Databricks では、ノートブックを実行して特徴量テーブルを定期的に (毎日など) 更新するジョブを作成することをお勧めします。 スケジュールされていないジョブがすでに作成されている場合は、スケジュール済みジョブに変換して、機能値が常に最新の状態になるようにすることができます。 Databricks のオーケストレーションの概要を参照してください。
特徴量テーブルを更新するコードでは、次の例に示すように、 mode='merge'
を使用します。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = compute_customer_features(data)
fe.write_table(
df=customer_features_df,
name='ml.recommender_system.customer_features',
mode='merge'
)
毎日の特徴の過去の値を保存してください
複合主キーを持つ特徴量テーブルを定義します。 主キーに日付を含めます。 たとえば、特徴量テーブル customer_features
の場合、効率的な読み取りのために複合主キー (date
、 customer_id
) とパーティション キー date
を使用できます。
Databricks では、効率的な読み取りのために、テーブルで リキッドクラスタリング を有効にすることをお勧めします。 リキッドクラスタリングを使用しない場合は、読み取りパフォーマンスを向上させるために、日付列をパーティションキーとして設定します。
- Databricks SQL
- Python
CREATE TABLE ml.recommender_system.customer_features (
customer_id int NOT NULL,
`date` date NOT NULL,
feat1 long,
feat2 varchar(100),
CONSTRAINT customer_features_pk PRIMARY KEY (`date`, customer_id)
)
-- If you are not using liquid clustering, uncomment the following line.
-- PARTITIONED BY (`date`)
COMMENT "Customer features";
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.create_table(
name='ml.recommender_system.customer_features',
primary_keys=['date', 'customer_id'],
# If you are not using liquid clustering, uncomment the following line.
# partition_columns=['date'],
schema=customer_features_df.schema,
description='Customer features'
)
その後、特徴量テーブルから date
をフィルター処理して対象期間に読み取るコードを作成できます。
また、 create_training_set
または score_batch
を使用するときにポイントインタイムルックアップを可能にする時系列特徴量テーブルを作成することもできます。「Unity Catalogで特徴量テーブルを作成する」を参照してください。
特徴量テーブルを最新の状態に保つには、特徴量テーブルに特徴量を書き込むか、新しい特徴量値をストリームするように、定期的にスケジュールされたジョブを設定します。
特徴量を更新するためのストリーミング特徴量計算パイプラインの作成
ストリーミング特徴量の計算パイプラインを作成するには、ストリーミング DataFrame
を引数として write_table
に渡します。 このメソッドは、 StreamingQuery
オブジェクトを返します。
def compute_additional_customer_features(data):
''' Returns Streaming DataFrame
'''
pass
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_transactions = spark.readStream.table("prod.events.customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)
fe.write_table(
df=stream_df,
name='ml.recommender_system.customer_features',
mode='merge'
)
Unity Catalog の特徴量テーブルから読み取る
read_table
を使用して特徴値を読み取ります。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
customer_features_df = fe.read_table(
name='ml.recommender_system.customer_features',
)
Unity Catalogで特徴量テーブルを検索して閲覧する
特徴量UIを使用して、 Unity Catalogで特徴量テーブルを検索または参照します。
-
サイドバーの
特徴量 をクリックして、特徴量UI を表示します。
-
カタログセレクターでcatalogを選択すると、そのカタログで使用可能なすべての特徴量テーブルが表示されます。 検索ボックスに、特徴量テーブル、特徴量テーブル、またはコメントの名前の全部または一部を入力します。 タグのキーまたは値の全部または一部を入力することもできます。検索テキストは大文字と小文字を区別しません。
特徴量テーブルのメタデータをUnity Catalogで取得する
get_table
を使用して、特徴量テーブルのメタデータを取得します。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
ft = fe.get_table(name="ml.recommender_system.user_feature_table")
print(ft.features)
Unity Catalogの特徴量テーブルと特徴量にタグを使用
単純なキーと値のペアであるタグを使用して、特徴量テーブルと特徴量を分類および管理できます。
特徴量テーブルでは、Catalog Explorer、ノートブックや SQL クエリ エディタの SQL ステートメント、または 特徴量エンジニアリング の Python APIを使用して、タグを作成、編集、削除できます。
特徴量については、Catalog Explorer を使用するか、ノートブックまたは SQL クエリエディタの SQL ステートメントを使用して、タグを作成、編集、および削除できます。
Unity Catalogセキュリティ保護可能なオブジェクトへのタグの適用および特徴量 エンジニアリングとワークスペース Feature Store Python APIを参照してください。
次の例は、特徴量 エンジニアリング Python API を使用して、特徴量テーブル タグを作成、更新、および削除する方法を示しています。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# Create feature table with tags
customer_feature_table = fe.create_table(
# ...
tags={"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2", ...},
# ...
)
# Upsert a tag
fe.set_feature_table_tag(name="customer_feature_table", key="tag_key_1", value="new_key_value")
# Delete a tag
fe.delete_feature_table_tag(name="customer_feature_table", key="tag_key_2")
Unity Catalog で特徴量テーブルを削除する
Unity Catalogで特徴量テーブルを削除するには、カタログエクスプローラを使用するか、 特徴量エンジニアリングPython API を使用して、 Unity CatalogのDeltaテーブルを直接削除します。
- 特徴量テーブルを削除すると、上流のプロデューサーと下流のコンシューマー (モデル、エンドポイント、スケジュール済みジョブ) で予想外のエラーが発生する恐れがあります。
- Unity Catalogで特徴量テーブルを削除すると、基になる Delta テーブルも削除されます。
drop_table
は、Databricks Runtime 13.1 ML 以下ではサポートされていません。 SQL コマンドを使用して、テーブルを削除します。
Databricks SQL または FeatureEngineeringClient.drop_table
を使用して、Unity Catalogで特徴量テーブルを削除できます。
- Databricks SQL
- Python
DROP TABLE ml.recommender_system.customer_features;
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
fe.drop_table(
name='ml.recommender_system.customer_features'
)
Unity Catalog の特長量テーブルをワークスペースまたはアカウント間で共有する
Unity Catalog の特徴量テーブルは、テーブルのUnity Catalogメタストアに割り当てられたすべてのワークスペースからアクセスできます。
特徴量テーブルを、同じUnity Catalogメタストアに割り当てられていないワークスペースと共有するには、Delta Sharingを使用します。