オンライン テーブルをリアルタイム特徴量サービングに使用する
プレビュー
オンライン テーブルは、 us-east-1
、 us-west-2
、 eu-west-1
、 ap-southeast-1
、 ap-southeast-2
の各地域でパブリック プレビュー段階です。 価格 情報については、 オンラインテーブルの価格を参照してください。
オンライン テーブルは、オンライン アクセス用に最適化された行指向形式で格納される Delta テーブルの読み取り専用コピーです。 オンラインテーブルは、リクエストの負荷に応じてスループット容量を自動スケーリングし、あらゆる規模のデータへの低レイテンシと高スループットのアクセスを提供する、完全なサーバレス・テーブルです。 オンラインテーブルは、 Mosaic AI Model Serving、 Feature Serving、およびオンラインテーブルが高速データルックアップに使用される RAG(Retrieval-Augmented Generation) アプリケーションで動作するように設計されています。
また、レイクハウスフェデレーションを使用したクエリでオンラインテーブルを使用することもできます。レイクハウスフェデレーションを使用する場合、オンラインテーブルにアクセスするには、サーバレス SQLウェアハウスを使用する必要があります。 読み取り操作 (SELECT
) のみがサポートされています。 この機能は、対話型またはデバッグのみを目的としており、本番運用やミッション クリティカルなワークロードには使用しないでください。
Databricks UI を使用したオンライン テーブルの作成は、1 ステップのプロセスです。 カタログ エクスプローラーで Delta テーブルを選択し、 オンライン テーブルの作成 を選択するだけです。 REST API または Databricks SDK を使用して、オンライン テーブルを作成および管理することもできます。 「APIsを使用したオンライン テーブルの操作」を参照してください。
必要条件
- ワークスペースは Unity Catalog に対して有効になっている必要があります。 ドキュメントに従って、Unity Catalogメタストアを作成し、ワークスペースで有効にして、カタログを作成します。
- オンライン テーブルにアクセスするには、モデルを Unity Catalog に登録する必要があります。
- Databricks管理者は、アカウントコンソールでサーバレス の利用条項 に同意する必要があります。
UI を使用したオンライン テーブルの操作
このセクションでは、オンライン テーブルを作成および削除する方法、およびオンライン テーブルのステータスを確認して更新をトリガーする方法について説明します。
UI を使用したオンライン テーブルの作成
オンラインテーブルは、Catalog Explorer を使用して作成します。 必要な権限に関する情報については、「 ユーザー権限」を参照してください。
-
オンライン テーブルを作成するには、ソースとなるDelta テーブルに主キーが必要です。 使用するDeltaテーブルに主キーがない場合は、 Unity Catalogの既存のDelta テーブルを特徴量テーブルとして使用する の手順に従って主キーを作成します。
-
カタログ エクスプローラで、オンライン テーブルに同期するソース テーブルに移動します。 作成 メニューから、 オンラインテーブル を選択します。
-
ダイアログのセレクタを使用して、オンラインテーブルを設定します。
名前 : Unity Catalog のオンライン テーブルに使用する名前。
主キー : オンライン テーブルの主キーとして使用するソース テーブルの列。
時系列キー : (オプション)。 時系列キーとして使用するソーステーブルの列。 指定すると、オンライン・テーブルには、各プライマリ・キーの最新の時系列キー値を持つローのみが含まれます。
同期モード : 同期パイプラインがオンライン テーブルを更新する方法を指定します。 スナップショット 、 トリガー 、または 連続 のいずれかを選択します。
ポリシー
説明
スナップショット
パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、それをオンライン テーブルにコピーします。 ソース テーブルに対する後続の変更は、ソースの新しいスナップショットを作成し、新しいコピーを作成することで、オンライン テーブルに自動的に反映されます。 オンライン テーブルの内容はアトミックに更新されます。
トリガー
パイプラインは 1 回実行され、オンライン テーブルにソース テーブルの初期スナップショット コピーが作成されます。 スナップショット同期モードとは異なり、オンラインテーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、オンラインテーブルに適用されます。 増分更新は、手動でトリガーすることも、スケジュールに従って自動的にトリガーすることもできます。
連続
パイプラインは継続的に実行されます。 ソース テーブルに対する後続の変更は、リアルタイム ストリーミング モードでオンライン テーブルに段階的に適用されます。 手動で更新する必要はありません。
トリガー 同期モードまたは 連続 同期モードをサポートするには、ソース テーブルでチェンジデータフィードが有効になっている必要があります。
- 完了したら、 確認 をクリックします。 オンラインテーブルページが表示されます。
- 新しいオンライン・テーブルは、作成ダイアログで指定したカタログ、スキーマ、および名前の下に作成されます。 カタログエクスプローラでは、オンラインテーブルは
で示されます。
UI を使用してステータスを取得し、更新をトリガーする
オンライン テーブルのステータスを確認するには、カタログ内のテーブルの名前をクリックして開きます。オンライン テーブル ページが表示され、 概要 タブが開いています。 データ取り込み セクションには、最新の更新のステータスが表示されます。更新をトリガーするには、 今すぐ同期 をクリックします。 データ取り込み セクションには、テーブルを更新する DLT パイプラインへのリンクも含まれています。
定期的な更新のスケジュール
スナップショット または トリガー同期 モードのオンラインテーブルの場合、定期的な自動更新をスケジュールできます。更新スケジュールは、テーブルを更新する DLT パイプラインによって管理されます。
- カタログエクスプローラで、オンラインテーブルに移動します。
- データ取り込み セクションで、パイプラインへのリンクをクリックします。
- 右上隅の スケジュール をクリックし、新しいスケジュールを追加するか、既存のスケジュールを更新します。
UI を使用したオンライン テーブルの削除
オンラインテーブルページから、 ケバブメニューから 削除 を選択します。
API を使用したオンライン テーブルでの作業
Databricks SDK または REST API を使用して、オンライン テーブルを作成および管理することもできます。
リファレンス情報については、 Databricks SDK for Python または REST API のリファレンス ドキュメントを参照してください。
必要条件
Databricks SDK バージョン 0.20 以降。
APIs を使用してオンライン テーブルを作成する
- Databricks SDK - Python
- REST API
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
w = WorkspaceClient(host='https://xxx.databricks.com', token='xxx')
# Create an online table
spec = OnlineTableSpec(
primary_key_columns=["pk_col"],
source_table_full_name="main.default.source_table",
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'})
)
online_table = OnlineTable(
name="main.default.my_online_table", # Fully qualified table name
spec=spec # Online table specification
)
w.online_tables.create_and_wait(table=online_table)
curl --request POST "https://xxx.databricks.com/api/2.0/online-tables" \
--header "Authorization: Bearer xxx" \
--data '{
"name": "main.default.my_online_table",
"spec": {
"run_triggered": {},
"source_table_full_name": "main.default.source_table",
"primary_key_columns": ["a"]
}
}'
オンライン テーブルは、作成後、自動的に同期を開始します。
API を使用してステータスを取得し、更新をトリガーします
次の例に従って、オンライン テーブルのステータスと仕様を表示できます。 オンラインテーブルが連続していない場合 また、そのデータの手動更新をトリガーする場合は、パイプライン API を使用して更新できます。
オンライン テーブル仕様でオンライン テーブルに関連付けられたパイプライン ID を使用し、パイプラインで新しい更新を開始して更新をトリガーします。 これは、カタログエクスプローラーのオンラインテーブル UI で 今すぐ同期 をクリックするのと同じです。
- Databricks SDK - Python
- REST API
pprint(w.online_tables.get('main.default.my_online_table'))
# Sample response
OnlineTable(name='main.default.my_online_table',
spec=OnlineTableSpec(perform_full_copy=None,
pipeline_id='some-pipeline-id',
primary_key_columns=['pk_col'],
run_continuously=None,
run_triggered={},
source_table_full_name='main.default.source_table',
timeseries_key=None),
status=OnlineTableStatus(continuous_update_status=None,
detailed_state=OnlineTableState.PROVISIONING,
failed_status=None,
message='Online Table creation is '
'pending. Check latest status in '
'DLT: '
'https://xxx.databricks.com/pipelines/some-pipeline-id',
provisioning_status=None,
triggered_update_status=None))
# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
w.pipelines.start_update(pipeline_id='some-pipeline-id', full_refresh=True)
curl --request GET \
"https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
--header "Authorization: Bearer xxx"
# Sample response
{
"name": "main.default.my_online_table",
"spec": {
"run_triggered": {},
"source_table_full_name": "main.default.source_table",
"primary_key_columns": ["pk_col"],
"pipeline_id": "some-pipeline-id"
},
"status": {
"detailed_state": "PROVISIONING",
"message": "Online Table creation is pending. Check latest status in DLT: https://xxx.databricks.com#joblist/pipelines/some-pipeline-id"
}
}
# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
curl --request POST "https://xxx.databricks.com/api/2.0/pipelines/some-pipeline-id/updates" \
--header "Authorization: Bearer xxx" \
--data '{
"full_refresh": true
}'
API を使用してオンライン テーブルを削除する
- Databricks SDK - Python
- REST API
w.online_tables.delete('main.default.my_online_table')
curl --request DELETE \
"https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
--header "Authorization: Bearer xxx"
オンライン・テーブルを削除すると、進行中のデータ同期が停止し、そのすべてのリソースが解放されます。
Feature Servingエンドポイントを使用したオンラインテーブルデータの提供
Databricks の外部でホストされているモデルとアプリケーションの場合は、オンライン テーブルから特徴量を提供する特徴量サービングエンドポイントを作成できます。 エンドポイントは、REST API を使用して低遅延で機能を利用できるようにします。
-
フィーチャスペックを作成します。
フィーチャスペックを作成するときは、ソースの Delta テーブルを指定します。 これにより、機能仕様をオフラインとオンラインの両方のシナリオで使用できます。 オンラインルックアップの場合、サービスエンドポイントは自動的にオンラインテーブルを使用して、低レイテンシーの機能ルックアップを実行します。
ソース Delta テーブルとオンライン テーブルでは、同じ主キーを使用する必要があります。
特徴量の仕様は、カタログエクスプローラーの 関数 タブで表示できます。
Pythonfrom databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
fe = FeatureEngineeringClient()
fe.create_feature_spec(
name="catalog.default.user_preferences_spec",
features=[
FeatureLookup(
table_name="user_preferences",
lookup_key="user_id"
)
]
) -
特徴量サービングエンドポイントを作成します。
このステップでは、Delta テーブル
user_preferences
のデータを同期するuser_preferences_online_table
という名前のオンライン テーブルが作成されていることを前提としています。特徴量仕様を使用して、特徴量サービングエンドポイントを作成します。 エンドポイントは、関連付けられたオンライン テーブルを使用して、REST API を介してデータを利用できるようにします。
この操作を実行するユーザーは、オフライン テーブルとオンライン テーブルの両方の所有者である必要があります。
- Databricks SDK - Python
- Python API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
workspace = WorkspaceClient()
# Create endpoint
endpoint_name = "fse-location"
workspace.serving_endpoints.create_and_wait(
name=endpoint_name,
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name,
scale_to_zero_enabled=True,
workload_size="Small"
)
]
)
)
from databricks.feature_engineering.entities.feature_serving_endpoint import (
ServedEntity,
EndpointCoreConfig,
)
fe.create_feature_serving_endpoint(
name="user-preferences",
config=EndpointCoreConfig(
served_entities=ServedEntity(
feature_spec_name="catalog.default.user_preferences_spec",
workload_size="Small",
scale_to_zero_enabled=True
)
)
)
-
特徴量サービングエンドポイントからデータを取得します。
API エンドポイントにアクセスするには、エンドポイント URL に HTTP GET 要求を送信します。 この例では、 Python APIsを使用してこれを行う方法を示します。 その他の言語とツールについては、 Feature Servingを参照してください。
Python# Set up credentials
export DATABRICKS_TOKEN=...Pythonurl = "https://{workspace_url}/serving-endpoints/user-preferences/invocations"
headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'}
data = {
"dataframe_records": [{"user_id": user_id}]
}
data_json = json.dumps(data, allow_nan=True)
response = requests.request(method='POST', headers=headers, url=url, data=data_json)
if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')
print(response.json()['outputs'][0]['hotel_preference'])
RAGアプリケーションでオンラインテーブルを使用
RAGアプリケーションは、オンラインテーブルの一般的なユースケースです。 RAG アプリケーションに必要な構造化データのオンライン テーブルを作成し、 Feature Servingエンドポイントでホストします。 RAG アプリケーションは、 Feature Servingエンドポイントを使用して、オンライン テーブルから関連データを検索します。
一般的な手順は次のとおりです。
- 特徴量サービングエンドポイントを作成します。
- LangChainまたはエンドポイントを使用して関連データを検索する同様のパッケージを使用してツールを作成します。
- LangChainエージェントまたは同様のエージェントでツールを使用して、関連データを取得します。
- アプリケーションをホストするモデルサービング エンドポイントを作成します。
詳細な手順とノートブックの例については、「 特徴エンジニアリングの例: 構造化 RAG アプリケーション」を参照してください。
ノートブックの例
次のノートブックは、リアルタイム サービスおよび自動機能検索のために、機能をオンライン テーブルに公開する方法を示しています。
オンライン テーブルのデモ ノートブック
Mosaic AI Model Serving でオンライン テーブルを使用する
オンライン テーブルを使用して、 Mosaic AI Model Servingの特徴を検索できます。 特徴量テーブルをオンラインテーブルに同期すると、その特徴量テーブルの特徴を使用して学習させたモデルは、推論中にオンラインテーブルから特徴値を自動的に検索します。 追加の設定は必要ありません。
-
FeatureLookup
を使用してモデルをトレーニングします。model トレーニングでは、次の例に示すように、model トレーニング セット内のオフライン特徴量テーブルの特徴を使用します。
Pythontraining_set = fe.create_training_set(
df=id_rt_feature_labels,
label='quality',
feature_lookups=[
FeatureLookup(
table_name="user_preferences",
lookup_key="user_id"
)
],
exclude_columns=['user_id'],
) -
Mosaic AI Model Serving でモデルを提供します。 モデルは、オンライン テーブルからフィーチャを自動的に検索します。 詳細についてはDatabricksモデルサービングを使用した自動フィーチャ検索を参照してください。
ユーザー権限
オンライン テーブルを作成するには、次のアクセス許可が必要です。
SELECT
ソース・テーブルに対する権限。USE_CATALOG
宛先カタログに対する特権。USE_SCHEMA
宛先スキーマでのCREATE_TABLE
権限。
オンライン テーブルのデータ同期パイプラインを管理するには、オンライン テーブルの所有者であるか、オンライン テーブルに対する REFRESH 特権が付与されている必要があります。 カタログに対するUSE_CATALOGおよびUSE_SCHEMA権限を持たないユーザーには、カタログエクスプローラーにオンラインテーブルが表示されません。
Unity Catalog メタストアには、 特権モデル バージョン 1.0 が必要です。
エンドポイント権限モデル
一意のサービスプリンシパルは、オンラインテーブルからのデータのクエリに必要なアクセス許可が制限された Feature Serving またはモデルサービングエンドポイントに対して自動的に作成されます。 このサービスプリンシパルは、エンドポイントがリソースを作成したユーザーとは無関係にデータにアクセスできるようにし、作成者がワークスペースを離れた場合でもエンドポイントが引き続き機能できるようにします。
このサービスプリンシパルの有効期間は、エンドポイントの有効期間です。 監査ログは、 Unity Catalog カタログの所有者に対してシステム生成レコードが示され、このサービスプリンシパルに必要な権限を付与していることを示している場合があります。
制限
- ソース・テーブルごとにサポートされるオンライン・テーブルは 1 つだけです。
- オンライン テーブルとそのソース テーブルには、最大 1000 列を含めることができます。
- データ型 ARRAY、MAP、または STRUCT の列は、オンライン テーブルの主キーとして使用できません。
- カラムがオンライン・テーブルのプライマリ・キーとして使用されている場合、ソース・テーブルでカラムに NULL 値が含まれているすべてのローは無視されます。
- 外部テーブル、システムテーブル、および内部テーブルは、ソーステーブルとしてサポートされていません。
- Delta チェンジデータフィードが有効になっていないソース テーブルは、 スナップショット 同期モードのみをサポートします。
- Delta Sharing テーブルは、 スナップショット 同期モードでのみサポートされます。
- オンライン・テーブルのカタログ名、スキーマ名、およびテーブル名には、英数字とアンダースコアのみを含めることができ、数字で始めることはできません。 ダッシュ (
-
) は使用できません。 - 文字列型の列の長さは 64KB に制限されています。
- 列名の長さは 64 文字に制限されています。
- 行の最大サイズは 2MB です。
- パブリック プレビュー中の Unity Catalog メタストア内のすべてのオンライン テーブルの合計サイズは、非圧縮のユーザー データで 2 TB です。
- 最大クエリ/秒 (QPS) は 12,000 です。 Databricksアカウントチームに連絡して、制限を増やしてください。
トラブルシューティング
[オンライン テーブルの作成 ] オプションが表示されません
通常、原因は、同期しようとしているテーブル (ソース テーブル) がサポートされている型ではないことです。 ソース テーブルの [セキュリティ保護可能な種類] ([カタログ エクスプローラ の詳細 ] タブに表示) が、次のサポートされているオプションのいずれかであることを確認します。
TABLE_EXTERNAL
TABLE_DELTA
TABLE_DELTA_EXTERNAL
TABLE_DELTASHARING
TABLE_DELTASHARING_MUTABLE
TABLE_STREAMING_LIVE_TABLE
TABLE_STANDARD
TABLE_FEATURE_STORE
TABLE_FEATURE_STORE_EXTERNAL
TABLE_VIEW
TABLE_VIEW_DELTASHARING
TABLE_MATERIALIZED_VIEW
オンラインテーブルを作成するときに、 トリガー同期モードと連続同期モードを選択できません
これは、ソース テーブルで Delta チェンジデータフィードが有効になっていない場合、または View または materialized ビューである場合に発生します。 インクリメンタル 同期モードを使用するには、ソース テーブルでチェンジデータフィードを有効にするか、非ビュー テーブルを使用します。
オンライン テーブルの更新が失敗するか、ステータスがオフラインと表示される
このエラーのトラブルシューティングを開始するには、Catalog Explorer のオンライン テーブルの [概要 ] タブに表示されるパイプライン ID をクリックします。
表示されるパイプライン UI ページで、「フロー '__online_table の解決に失敗しました」というエントリをクリックします。
ポップアップが表示され、 エラーの詳細 セクションに詳細が表示されます。
エラーの一般的な原因には、次のものがあります。
-
ソース テーブルが削除されたか、オンライン テーブルの同期中に削除されて同じ名前で再作成されました。 これは、連続したオンライン テーブルで特に一般的です。これは、常に同期しているためです。
-
ソース テーブルは、ファイアウォールの設定により、サーバレス コンピュート経由ではアクセスできません。 この状況では、[ エラーの詳細 ] セクションに "クラスター xxx で DLT サービスを開始できませんでした..." というエラー メッセージが表示される場合があります。
-
オンライン テーブルの合計サイズが、メタストア全体の 2 TB (非圧縮サイズ) の制限を超えています。 2 TB の制限は、Delta テーブルを行指向形式で拡張した後の非圧縮サイズを指します。 行形式のテーブルのサイズは、Catalog Explorer に表示される Delta テーブルのサイズ (列指向形式のテーブルの圧縮サイズを参照) よりも大幅に大きくなる可能性があります。 その差は、テーブルの内容によって 100 倍にもなります。
Delta テーブルの非圧縮で展開されたロー・サイズを見積もるには、サーバレス SQLウェアハウスから次のクエリを使用します。このクエリは、拡張されたテーブルの推定サイズをバイト単位で返します。 このクエリを正常に実行すると、サーバレス コンピュートがソース テーブルにアクセスできることも確認されます。
SQLSELECT sum(length(to_csv(struct(*)))) FROM `source_table`;