リアルタイムの特徴量サービングでのオンラインテーブルの利用
プレビュー
オンライン テーブルは、 us-east-1
、 us-west-2
、 eu-west-1
、 ap-southeast-1
、 ap-southeast-2
の各地域でパブリック プレビュー段階です。 価格 情報については、 Online Tables 価格を参照してください。
オンライン テーブルは、オンライン アクセス用に最適化された行指向形式で保存されるDeltaテーブルの読み取り専用コピーです。 オンライン テーブルは完全にサーバーレスのテーブルであり、要求の負荷に応じて容量を自動的にスケーリングし、あらゆる規模のデータに対して低レイテンシで高スループットのアクセスを提供します。 オンライン テーブルはMosaic AI Model Serving 、 Feature Serving 、および検索拡張生成 (RAG) アプリケーションと連携して動作するように設計されており、高速なデータ検索に使用されます。
レイクハウスフェデレーションを使用してクエリでオンライン テーブルを使用することもできます。 レイクハウスフェデレーションを使用する場合、オンラインテーブルにアクセスするには、サーバレス SQL ウェアハウスを使用する必要があります。 読み取り操作(SELECT
)のみがサポートされています。 この機能は対話型またはデバッグの目的のみを目的としており、本番運用やミッション クリティカルなワークロードには使用しないでください。
Databricks UI を使用したオンライン テーブルの作成は、1 ステップのプロセスです。 カタログ エクスプローラーでDeltaテーブルを選択し、 [オンライン テーブルの作成]を選択するだけです。 REST API または Databricks SDK を使用してオンライン テーブルを作成および管理することもできます。 APIを使用したオンライン テーブルの操作 を参照してください。
要件
ワークスペースは、Unity Catalog に対して有効になっている必要があります。 ドキュメントに従って、Unity Catalog メタストアを作成し、ワークスペースで有効にして、カタログを作成します。
オンライン テーブルにアクセスするには、モデルをUnity Catalogに登録する必要があります。
Databricks 管理者は、アカウント コンソールでサーバレスの利用規約に同意する必要があります。
UIを利用したオンライン テーブルの操作
ここでは、オンラインテーブルを作成および削除する方法と、オンラインテーブルのステータスを確認して更新をトリガーする方法について説明します。
UIを利用したオンラインテーブルの作成
オンラインテーブルは、カタログエクスプローラーを使用して作成します。 必要な権限については、 「ユーザー権限」を参照してください。
オンライン テーブルを作成するには、ソース Delta テーブルに主キーが必要です。 使用する Delta テーブルに主キーがない場合は、「 Unity Catalog の既存の Delta テーブルを特徴量テーブルとして使用する」の手順に従って作成します。
カタログ エクスプローラーで、オンライン テーブルに同期するソース テーブルに移動します。 [作成] メニューから [オンライン テーブル] を選択します。
ダイアログのセレクターを使用して、オンライン テーブルを構成します。
名前: Unity Catalog のオンライン テーブルに使用する名前。
主キー: オンライン表の主キーとして使用するソース表の列。
時系列キー: (オプション)。 時系列キーとして使用するソーステーブルの列。 指定した場合、オンラインテーブルには、各主キーの最新の時系列キー値を持つ行のみが含まれます。
同期モード: 同期パイプラインがオンライン テーブルを更新する方法を指定します。 「スナップショット」、「トリガー」、または「連続」のいずれかを選択します。
ポリシー
説明
スナップショット
パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、それをオンライン テーブルにコピーします。 ソース テーブルに対する後続の変更は、ソースの新しいスナップショットを作成し、新しいコピーを作成することによって、オンライン テーブルに自動的に反映されます。 オンライン テーブルの内容はアトミックに更新されます。
トリガー
パイプラインは 1 回実行され、オンライン テーブルにソース テーブルの初期スナップショット コピーが作成されます。 スナップショット同期モードとは異なり、オンライン テーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、オンライン テーブルに適用されます。 増分更新は、スケジュールに従って手動でトリガーすることも、自動的にトリガーすることもできます。
連続
パイプラインは継続的に実行されます。 ソーステーブルに対する後続の変更は、リアルタイムストリーミングモードでオンラインテーブルに増分的に適用されます。 手動で更新する必要はありません。
注:
トリガー同期モードまたは継続同期モードをサポートするには、ソース テーブルでチェンジ データフィードが有効になっている必要があります。
完了したら、「 確認」をクリックします。 オンライン テーブル ページが表示されます。
新しいオンライン テーブルは、作成ダイアログで指定したカタログ、スキーマ、および名前の下に作成されます。 カタログエクスプローラーでは、オンラインテーブルは で 示されます。
UIを使用して状態を取得し、更新をトリガーする
オンライン テーブルのステータスを確認するには、カタログ内のテーブルの名前をクリックして開きます。 オンライン テーブル ページが開き、[ 概要 ] タブが開きます。 [ データの取り込み ] セクションには、最新の更新のステータスが表示されます。 更新をトリガーするには、[ 今すぐ同期] をクリックします。 [ データの取り込み ] セクションには、テーブルを更新する Delta Live Tables パイプラインへのリンクも含まれています。
APIを利用したオンライン テーブルの操作
Databricks SDK または REST API を使用して、オンライン テーブルを作成および管理することもできます。
参考情報については、 Databricks SDK for PythonまたはREST APIのリファレンス ドキュメントを参照してください。
APIs使用してオンラインテーブルを作成する
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
w = WorkspaceClient(host='https://xxx.databricks.com', token='xxx')
# Create an online table
spec = OnlineTableSpec(
primary_key_columns=["pk_col"],
source_table_full_name="main.default.source_table",
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'})
)
w.online_tables.create(name='main.default.my_online_table', spec=spec)
curl --request POST "https://xxx.databricks.com/api/2.0/online-tables" \
--header "Authorization: Bearer xxx" \
--data '{
"name": "main.default.my_online_table",
"spec": {
"run_triggered": {},
"source_table_full_name": "main.default.source_table",
"primary_key_columns": ["a"]
}
}'
オンライン テーブルは、作成後に自動的に同期を開始します。
APIを利用してステータスを取得し、更新をトリガーする
オンラインテーブルのステータスと仕様は、以下の例に従って表示できます。 オンライン テーブルが連続的ではなく、そのデータの手動更新をトリガーしたい場合は、パイプライン API を使用してこれを行うことができます。
オンライン テーブル仕様でオンライン テーブルに関連付けられたパイプライン ID を使用し、パイプラインで新しい更新を開始して更新をトリガーします。 これは、カタログエクスプローラーのオンラインテーブル UI で 「今すぐ同期」(Sync now ) をクリックするのと同じです。
pprint(w.online_tables.get('main.default.my_online_table'))
# Sample response
OnlineTable(name='main.default.my_online_table',
spec=OnlineTableSpec(perform_full_copy=None,
pipeline_id='some-pipeline-id',
primary_key_columns=['pk_col'],
run_continuously=None,
run_triggered={},
source_table_full_name='main.default.source_table',
timeseries_key=None),
status=OnlineTableStatus(continuous_update_status=None,
detailed_state=OnlineTableState.PROVISIONING,
failed_status=None,
message='Online Table creation is '
'pending. Check latest status in '
'Delta Live Tables: '
'https://xxx.databricks.com/pipelines/some-pipeline-id',
provisioning_status=None,
triggered_update_status=None))
# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
w.pipelines.start_update(pipeline_id='some-pipeline-id', full_refresh=True)
curl --request GET \
"https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
--header "Authorization: Bearer xxx"
# Sample response
{
"name": "main.default.my_online_table",
"spec": {
"run_triggered": {},
"source_table_full_name": "main.default.source_table",
"primary_key_columns": ["pk_col"],
"pipeline_id": "some-pipeline-id"
},
"status": {
"detailed_state": "PROVISIONING",
"message": "Online Table creation is pending. Check latest status in Delta Live Tables: https://xxx.databricks.com#joblist/pipelines/some-pipeline-id"
}
}
# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
curl --request POST "https://xxx.databricks.com/api/2.0/pipelines/some-pipeline-id/updates" \
--header "Authorization: Bearer xxx" \
--data '{
"full_refresh": true
}'
特徴量サービングエンドポイントを使用したオンラインテーブルデータの提供
Databricks の外部でホストされているモデルとアプリケーションの場合は、オンライン テーブルから特徴量を提供する特徴量サービングエンドポイントを作成できます。 エンドポイントは、REST API を使用して低遅延で機能を利用できるようにします。
フィーチャ スペックを作成します。
機能仕様を作成するときは、ソース Delta テーブルを指定します。 これにより、機能仕様をオフラインとオンラインの両方のシナリオで使用できます。 オンライン ルックアップの場合、サービス エンドポイントは自動的にオンライン テーブルを使用して、待機時間の短い機能ルックアップを実行します。
ソース Delta テーブルとオンライン テーブルでは、同じ主キーを使用する必要があります。
フィーチャー仕様は、カタログエクスプローラーの 「機能」(Function ) タブで表示できます。
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup fe = FeatureEngineeringClient() fe.create_feature_spec( name="catalog.default.user_preferences_spec", features=[ FeatureLookup( table_name="user_preferences", lookup_key="user_id" ) ] )
特徴量サービングエンドポイントを作成します。
このステップでは、Delta テーブル
user_preferences
のデータを同期するuser_preferences_online_table
という名前のオンライン テーブルが作成されていることを前提としています。特徴量仕様を使用して、特徴量サービングエンドポイントを作成します。 エンドポイントは、関連付けられたオンライン テーブルを使用して、REST API を介してデータを利用できるようにします。注:
この操作を実行するユーザーは、オフライン テーブルとオンライン テーブルの両方の所有者である必要があります。
from databricks.sdk import WorkspaceClient from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput workspace = WorkspaceClient() # Create endpoint endpoint_name = "fse-location" workspace.serving_endpoints.create_and_wait( name=endpoint_name, config=EndpointCoreConfigInput( served_entities=[ ServedEntityInput( entity_name=feature_spec_name, scale_to_zero_enabled=True, workload_size="Small" ) ] ) )
from databricks.feature_engineering.entities.feature_serving_endpoint import ( ServedEntity, EndpointCoreConfig, ) fe.create_feature_serving_endpoint( name="user-preferences", config=EndpointCoreConfig( served_entities=ServedEntity( feature_spec_name="catalog.default.user_preferences_spec", workload_size="Small", scale_to_zero_enabled=True ) ) )
特徴量サービングエンドポイントからデータを取得します。
API エンドポイントにアクセスするには、HTTP GET リクエストをエンドポイント URL に送信します。 この例では、Python APIs使用してこれを行う方法を示します。 その他の言語とツールについては、「 特徴量サービング」を参照してください。
# Set up credentials export DATABRICKS_TOKEN=...
url = "https://{workspace_url}/serving-endpoints/user-preferences/invocations" headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'} data = { "dataframe_records": [{"user_id": user_id}] } data_json = json.dumps(data, allow_nan=True) response = requests.request(method='POST', headers=headers, url=url, data=data_json) if response.status_code != 200: raise Exception(f'Request failed with status {response.status_code}, {response.text}') print(response.json()['outputs'][0]['hotel_preference'])
RAGアプリケーションでオンラインテーブルを使用する
RAGアプリケーションは、オンラインテーブルの一般的なユースケースです。 RAGアプリケーションが必要とする構造化データ用のオンラインテーブルを作成し、特徴量サービングエンドポイントでホストします。 RAGアプリケーションは、特徴量サービングエンドポイントを使用して、オンラインテーブルから関連データを検索します。
典型的なステップは次のとおりです。
特徴量サービングエンドポイントを作成します。
LangChain またはエンドポイントを使用して関連データを検索する同様のパッケージを使用してツールを作成します。
LangChain エージェントまたは同様のエージェントのツールを使用して、関連データを取得します。
アプリケーションをホストするためのモデルサーバー エンドポイントを作成します。
詳細な手順とサンプルノートブックについては、 「機能エンジニアリングの例: 構造化 RAG アプリケーション」を参照してください。
Mosaic AI Model Serving でオンライン テーブルを使用する
オンライン テーブルを使用して、Mosaic AI Model Serving の機能を検索できます。 特徴量テーブルをオンライン テーブルに同期すると、その特徴量テーブルの特徴を使用してトレーニングされたモデルは、推論中にオンライン テーブルから特徴値を自動的に検索します。 追加の設定は必要ありません。
FeatureLookup
を使用してモデルをトレーニングする。モデル トレーニングでは、次の例に示すように、モデル トレーニング セットでオフラインの特徴量テーブルの特徴を使用します。
training_set = fe.create_training_set( df=id_rt_feature_labels, label='quality', feature_lookups=[ FeatureLookup( table_name="user_preferences", lookup_key="user_id" ) ], exclude_columns=['user_id'], )
Mosaic AI Model Serving を使用してモデルを提供します。 モデルは、オンライン テーブルから特徴を自動的に検索します。 詳細については、 Databricksモデルサービングによる自動機能検索」を参照してください。
ユーザー権限
オンライン テーブルを作成するには、次のアクセス許可が必要です。
SELECT
ソース表に対する特権。USE_CATALOG
宛先カタログに対する特権。USE_SCHEMA
宛先スキーマに対するCREATE_TABLE
特権。
オンライン テーブルのデータ同期パイプラインを管理するには、オンライン テーブルの所有者であるか、オンライン テーブルに対する REFRESH 特権が付与されている必要があります。 カタログに対するUSE_CATALOGおよびUSE_SCHEMA権限を持たないユーザーには、カタログエクスプローラーにオンラインテーブルが表示されません。
Unity Catalog メタストアには、 特権モデル バージョン 1.0 が必要です。
エンドポイントのアクセス許可モデル
一意のサービスプリンシパルは、オンラインテーブルからのデータのクエリに必要なアクセス許可が制限された Feature Serving またはモデルサービングエンドポイントに対して自動的に作成されます。 このサービスプリンシパルは、エンドポイントがリソースを作成したユーザーとは無関係にデータにアクセスできるようにし、作成者がワークスペースを離れた場合でもエンドポイントが引き続き機能できるようにします。
このサービスプリンシパルの有効期間は、エンドポイントの有効期間です。 監査ログは、 Unity Catalog カタログの所有者に対してシステム生成レコードが示され、このサービスプリンシパルに必要な権限を付与していることを示している場合があります。
制限事項
ソース表ごとにサポートされるオンライン表は 1 つだけです。
オンライン テーブルとそのソース テーブルには最大 1000 列を含めることができます。
データ型 ARRAY、MAP、または STRUCT の列は、オンライン表の主キーとして使用できません。
オンライン テーブルで列を主キーとして使用する場合、ソース テーブルで列に NULL 値が含まれているすべての行が無視されます。
外部テーブル、システムテーブル、および内部テーブルは、ソーステーブルとしてサポートされていません。
Delta チェンジデータフィードが有効になっていないソーステーブルは、スナップショット同期モードのみをサポートします。
Delta Sharing テーブルは、 スナップショット 同期モードでのみサポートされます。
オンライン表のカタログ名、スキーマ名、および表名には、英数字と下線のみを含めることができ、数字で始めることはできません。 ダッシュ (
-
) は使用できません。文字列型の列の長さは 64 KB に制限されています。
列名の長さは 64 文字に制限されています。
行の最大サイズは 2 MB です。
パブリック プレビュー中の Unity Catalog メタストア内のすべてのオンライン テーブルの合計サイズは、非圧縮のユーザー データで 2 TB です。
最大クエリ/秒 (QPS) は 12,000 です。 Databricksアカウントチームに連絡して、制限を増やしてください。
トラブルシューティング
[オンライン テーブルの作成] オプションが表示されない
原因は、通常、同期元のテーブル (ソース テーブル) がサポートされている型ではないことです。 ソース テーブルの [セキュリティ保護可能な種類] (「カタログ エクスプローラ の詳細 」タブに表示) が、以下のサポートされているオプションのいずれかであることを確認します。
TABLE_EXTERNAL
TABLE_DELTA
TABLE_DELTA_EXTERNAL
TABLE_DELTASHARING
TABLE_DELTASHARING_MUTABLE
TABLE_STREAMING_LIVE_TABLE
TABLE_STANDARD
TABLE_FEATURE_STORE
TABLE_FEATURE_STORE_EXTERNAL
TABLE_VIEW
TABLE_VIEW_DELTASHARING
TABLE_MATERIALIZED_VIEW
オンライン テーブル の作成時に [トリガー] または[継続的 ] 同期モードを選択できない
これは、ソース テーブルで Delta チェンジデータフィードが有効になっていない場合、または View または materialized ビューである場合に発生します。 インクリメンタル同期モードを使用するには、ソース テーブルでチェンジデータフィードを有効にするか、非ビュー テーブルを使用します。
オンラインテーブルの更新が失敗するか、ステータスがオフラインと表示される
このエラーのトラブルシューティングを開始するには、カタログ エクスプローラーのオンライン テーブルの[概要]タブに表示されるパイプライン ID をクリックします。
表示されるパイプライン UI ページで、「フロー '__online_table を解決できませんでした」というエントリをクリックします。
ポップアップが表示され、[ エラーの詳細 ] セクションに詳細が表示されます。
エラーの一般的な原因には、次のようなものがあります。
オンライン テーブルの同期中に、ソース テーブルが削除されたか、削除されて同じ名前で再作成されました。 これは、連続したオンラインテーブルで常に同期しているため、特に一般的です。
ファイアウォールの設定により、サーバレスコンピュート経由でソーステーブルにアクセスできません。 この状況では、エラーの詳細セクションに「クラスター xxx で DLT サービスを開始できませんでした…」というエラー メッセージが表示される場合があります。
オンライン テーブルの合計サイズが、メタストア全体の 2 TB (非圧縮サイズ) の制限を超えています。 2 TB の制限は、Delta テーブルを行指向形式で拡張した後の非圧縮サイズを指します。 行形式のテーブルのサイズは、Catalog Explorer に表示される Delta テーブルのサイズ (列指向形式のテーブルの圧縮サイズを参照) よりも大幅に大きくなる可能性があります。 その差は、テーブルの内容によって 100 倍にもなります。
Deltaテーブルの非圧縮の行展開サイズを見積もるには、サーバレス SQL ウェアハウスの次のクエリを使用します。 このクエリは、拡張されたテーブルの推定サイズをバイト単位で返します。 このクエリが正常に実行されると、サーバレス コンピュートがソース テーブルにアクセスできることも確認されます。
SELECT sum(length(to_csv(struct(*)))) FROM `source_table`;