リアルタイムの特徴量サービングでのオンラインテーブルの利用

プレビュー

オンライン テーブルはパブリック プレビュー段階です。 プレビュー中に、オンライン テーブルにデータを取り込むと、SQL サーバレス DBU が消費されます。 オンライン テーブルの最終価格は将来公開される予定です。

オンライン テーブル プレビューは、 us-east-1us-west-2eu-west-1ap-southeast-2の各リージョンで利用できます。

オンライン テーブルは、オンライン アクセス用に最適化された行指向形式で格納されるDeltaテーブルの読み取り専用コピーです。 オンライン テーブルは完全にサーバレス テーブルであり、リクエストの負荷に応じてスループット容量を自動スケールし、あらゆる規模のデータへの低レイテンシーと高スループットのアクセスを提供します。 オンライン テーブルは、Databricks モデルビングサービング、Feature Serving、および高速データ検索に使用される検索拡張生成 (RAG) アプリケーションと連携するように設計されています。

レイクハウスフェデレーションを使用してクエリでオンライン テーブルを使用することもできます。 レイクハウスフェデレーションを使用する場合、オンラインテーブルにアクセスするには、サーバレス SQL ウェアハウスを使用する必要があります。 読み取り操作(SELECT)のみがサポートされています。 この機能は対話型またはデバッグの目的のみを目的としており、本番運用やミッション クリティカルなワークロードには使用しないでください。

Databricks UI を使用したオンライン テーブルの作成は、1 ステップのプロセスです。 カタログ エクスプローラーでDeltaテーブルを選択し、 [オンライン テーブルの作成]を選択するだけです。 REST API または Databricks SDK を使用してオンライン テーブルを作成および管理することもできます。 APIsを使用したオンライン テーブルの操作 」を参照してください。

要件

  • ワークスペースは、Unity Catalog に対して有効になっている必要があります。 ドキュメントに従って、Unity Catalog メタストアを作成し、ワークスペースで有効にして、カタログを作成します。

  • オンライン テーブルにアクセスするには、モデルをUnity Catalogに登録する必要があります。

  • Databricks 管理者は、アカウント コンソールでサーバレスの利用規約に同意する必要があります。

UI を使用したオンライン テーブルの操作

ここでは、オンラインテーブルを作成および削除する方法と、オンラインテーブルのステータスを確認して更新をトリガーする方法について説明します。

UI を使用したオンライン テーブルの作成

オンラインテーブルは、カタログエクスプローラーを使用して作成します。 必要な権限については、 「ユーザー権限」を参照してください。

  1. オンライン テーブルを作成するには、ソース Delta テーブルに主キーが必要です。 使用する Delta テーブルに主キーがない場合は、「 Unity Catalog の既存の Delta テーブルを特徴量テーブルとして使用する」の手順に従って作成します。

  2. カタログ エクスプローラーで、オンライン テーブルに同期するソース テーブルに移動します。 [作成] メニューから [オンライン テーブル] を選択します。

    [オンライン テーブルの作成] を選択します
  3. ダイアログのセレクターを使用して、オンライン テーブルを構成します。

    [オンライン テーブルの構成] ダイアログ

    名前: Unity Catalog のオンライン テーブルに使用する名前。

    主キー: オンライン表の主キーとして使用するソース表の列。

    時系列キー: (オプション)。 時系列キーとして使用するソーステーブルの列。 指定した場合、オンラインテーブルには、各主キーの最新の時系列キー値を持つ行のみが含まれます。

    同期モード: 同期パイプラインがオンライン テーブルを更新する方法を指定します。 「スナップショット」、「トリガー」、または「連続」のいずれかを選択します。

    ポリシー

    説明

    スナップショット

    パイプラインは 1 回実行され、ソース テーブルのスナップショットを取得し、それをオンライン テーブルにコピーします。 ソース テーブルに対する後続の変更は、ソースの新しいスナップショットを作成し、新しいコピーを作成することによって、オンライン テーブルに自動的に反映されます。 オンライン テーブルの内容はアトミックに更新されます。

    トリガー

    パイプラインは 1 回実行され、オンライン テーブルにソース テーブルの初期スナップショット コピーが作成されます。 スナップショット同期モードとは異なり、オンライン テーブルが更新されると、最後のパイプライン実行以降の変更のみが取得され、オンライン テーブルに適用されます。 増分更新は、スケジュールに従って手動でトリガーすることも、自動的にトリガーすることもできます。

    連続

    パイプラインは継続的に実行されます。 ソーステーブルに対する後続の変更は、リアルタイムストリーミングモードでオンラインテーブルに増分的に適用されます。 手動で更新する必要はありません。

注:

トリガー同期モードまたは継続同期モードをサポートするには、ソース テーブルでチェンジ データフィードが有効になっている必要があります。

  1. 完了したら、「 確認」をクリックします。 オンライン テーブル ページが表示されます。

  2. 新しいオンライン テーブルは、作成ダイアログで指定したカタログ、スキーマ、および名前の下に作成されます。 カタログエクスプローラーでは、オンラインテーブルは で オンライン テーブル アイコン示されます。

UI を使用して状態を取得し、更新をトリガーする

オンライン テーブルのステータスを確認するには、カタログ内のテーブルの名前をクリックして開きます。 オンライン テーブル ページが開き、[ 概要 ] タブが開きます。 [ データの取り込み ] セクションには、最新の更新のステータスが表示されます。 更新をトリガーするには、[ 今すぐ同期] をクリックします。 [ データの取り込み ] セクションには、テーブルを更新する Delta Live Tables パイプラインへのリンクも含まれています。

カタログ内のオンラインテーブルページの表示

UI を使用したオンライン テーブルの削除

オンラインテーブルページで、ケバブメニューから[削除ケバブメニュー]を選択します。

APIs使用してオンライン テーブルを操作する

Databricks SDK または REST API を使用して、オンライン テーブルを作成および管理することもできます。

参考情報については、 Databricks SDK for PythonまたはREST APIのリファレンス ドキュメントを参照してください。

要件

Databricks SDK バージョン 0.20 以降。

APIs使用してオンラインテーブルを作成する

from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *

w = WorkspaceClient(host='https://xxx.databricks.com', token='xxx')

# Create an online table
spec = OnlineTableSpec(
  primary_key_columns=["pk_col"],
  source_table_full_name="main.default.source_table",
  run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'})
)

w.online_tables.create(name='main.default.my_online_table', spec=spec)
curl --request POST "https://xxx.databricks.com/api/2.0/online-tables" \
--header "Authorization: Bearer xxx" \
--data '{
    "name": "main.default.my_online_table",
    "spec": {
        "run_triggered": {},
        "source_table_full_name": "main.default.source_table",
        "primary_key_columns": ["a"]
    }
  }'

オンライン テーブルは、作成後に自動的に同期を開始します。

APIs使用してステータスを取得し、更新をトリガーする

オンラインテーブルのステータスと仕様は、以下の例に従って表示できます。 オンライン テーブルが連続的ではなく、そのデータの手動更新をトリガーしたい場合は、パイプライン API を使用してこれを行うことができます。

オンライン テーブル仕様でオンライン テーブルに関連付けられたパイプライン ID を使用し、パイプラインで新しい更新を開始して更新をトリガーします。 これは、カタログエクスプローラーのオンラインテーブル UI で 「今すぐ同期」(Sync now ) をクリックするのと同じです。

pprint(w.online_tables.get('main.default.my_online_table'))

# Sample response
OnlineTable(name='main.default.my_online_table',
    spec=OnlineTableSpec(perform_full_copy=None,
        pipeline_id='some-pipeline-id',
        primary_key_columns=['pk_col'],
        run_continuously=None,
        run_triggered={},
        source_table_full_name='main.default.source_table',
        timeseries_key=None),
    status=OnlineTableStatus(continuous_update_status=None,
        detailed_state=OnlineTableState.PROVISIONING,
        failed_status=None,
        message='Online Table creation is '
            'pending. Check latest status in '
            'Delta Live Tables: '
            'https://xxx.databricks.com/pipelines/some-pipeline-id',
        provisioning_status=None,
        triggered_update_status=None))

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
w.pipelines.start_update(pipeline_id='some-pipeline-id', full_refresh=True)
curl --request GET \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

# Sample response
{
  "name": "main.default.my_online_table",
  "spec": {
    "run_triggered": {},
    "source_table_full_name": "main.default.source_table",
    "primary_key_columns": ["pk_col"],
    "pipeline_id": "some-pipeline-id"
  },
  "status": {
    "detailed_state": "PROVISIONING",
    "message": "Online Table creation is pending. Check latest status in Delta Live Tables: https://xxx.databricks.com#joblist/pipelines/some-pipeline-id"
  }
}

# Trigger an online table refresh by calling the pipeline API. To discard all existing data
# in the online table before refreshing, set "full_refresh" to "True". This is useful if your
# online table sync is stuck due to, for example, the source table being deleted and recreated
# with the same name while the sync was running.
curl --request POST "https://xxx.databricks.com/api/2.0/pipelines/some-pipeline-id/updates" \
  --header "Authorization: Bearer xxx" \
  --data '{
    "full_refresh": true
  }'

APIs使用してオンラインテーブルを削除する

w.online_tables.delete('main.default.my_online_table')
curl --request DELETE \
  "https://xxx.databricks.com/api/2.0/online-tables/main.default.my_online_table" \
  --header "Authorization: Bearer xxx"

オンライン テーブルを削除すると、進行中のデータ同期が停止され、そのすべてのリソースが解放されます。

特徴量サービングエンドポイントを使用したオンラインテーブルデータの提供

Databricks の外部でホストされているモデルとアプリケーションの場合は、オンライン テーブルから特徴量を提供する特徴量サービングエンドポイントを作成できます。 エンドポイントは、REST API を使用して低遅延で機能を利用できるようにします。

  1. フィーチャ スペックを作成します。

    機能仕様を作成するときは、ソース Delta テーブルを指定します。 これにより、機能仕様をオフラインとオンラインの両方のシナリオで使用できます。 オンライン ルックアップの場合、サービス エンドポイントは自動的にオンライン テーブルを使用して、待機時間の短い機能ルックアップを実行します。

    ソース Delta テーブルとオンライン テーブルでは、同じ主キーを使用する必要があります。

    フィーチャー仕様は、カタログエクスプローラーの 「機能」(Function ) タブで表示できます。

    from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
    
    fe = FeatureEngineeringClient()
    fe.create_feature_spec(
      name="catalog.default.user_preferences_spec",
      features=[
        FeatureLookup(
          table_name="user_preferences",
          lookup_key="user_id"
        )
      ]
    )
    
  2. 特徴量サービングエンドポイントを作成します。

    このステップでは、Delta テーブル user_preferencesのデータを同期する user_preferences_online_table という名前のオンライン テーブルが作成されていることを前提としています。特徴量仕様を使用して、特徴量サービングエンドポイントを作成します。 エンドポイントは、関連付けられたオンライン テーブルを使用して、REST API を介してデータを利用できるようにします。

    注:

    この操作を実行するユーザーは、オフライン テーブルとオンライン テーブルの両方の所有者である必要があります。

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
    
    workspace = WorkspaceClient()
    
    # Create endpoint
    endpoint_name = "fse-location"
    
    workspace.serving_endpoints.create_and_wait(
      name=endpoint_name,
      config=EndpointCoreConfigInput(
        served_entities=[
          ServedEntityInput(
            entity_name=feature_spec_name,
            scale_to_zero_enabled=True,
            workload_size="Small"
          )
        ]
      )
    )
    
    fe.create_feature_serving_endpoint(
      name="user-preferences",
      config=EndpointCoreConfig(
        served_entities=ServedEntity(
          feature_spec_name="catalog.default.user_preferences_spec",
          workload_size="Small",
          scale_to_zero_enabled=True
        )
      )
    )
    
  3. 特徴量サービングエンドポイントからデータを取得します。

    API エンドポイントにアクセスするには、HTTP GET リクエストをエンドポイント URL に送信します。 この例では、Python APIs使用してこれを行う方法を示します。 その他の言語とツールについては、「 特徴量サービング」を参照してください。

    # Set up credentials
    export DATABRICKS_TOKEN=...
    
    url = "https://{workspace_url}/serving-endpoints/user-preferences/invocations"
    
    headers = {'Authorization': f'Bearer {DATABRICKS_TOKEN}', 'Content-Type': 'application/json'}
    
    data = {
      "dataframe_records": [{"user_id": user_id}]
    }
    data_json = json.dumps(data, allow_nan=True)
    
    response = requests.request(method='POST', headers=headers, url=url, data=data_json)
    if response.status_code != 200:
      raise Exception(f'Request failed with status {response.status_code}, {response.text}')
    
    print(response.json()['outputs'][0]['hotel_preference'])
    

RAGアプリケーションでオンラインテーブルを使用する

RAGアプリケーションは、オンラインテーブルの一般的なユースケースです。 RAGアプリケーションが必要とする構造化データ用のオンラインテーブルを作成し、特徴量サービングエンドポイントでホストします。 RAGアプリケーションは、特徴量サービングエンドポイントを使用して、オンラインテーブルから関連データを検索します。

典型的なステップは次のとおりです。

  1. 特徴量サービングエンドポイントを作成します。

  2. エンドポイントを使用して関連データを検索する LangChainTool を作成します。

  3. LangChainエージェントのツールを使用して、関連データを取得します。

  4. LangChainアプリケーションをホストするモデルサービングエンドポイントを作成します。

詳細な手順については、次のノートブックの例を参照してください。

ノートブックの例

次のノートブックは、リアルタイム サービスと自動フィーチャ ルックアップのために、フィーチャをオンライン テーブルに公開する方法を示しています。

オンライン テーブルのデモ ノートブック

ノートブックを新しいタブで開く

次のノートブックは、Databricks オンライン テーブルと特徴量サービングエンドポイントを RAG (Augmented Generation) 取得アプリケーションに使用する方法を示しています。

RAGアプリケーションのデモノートブックを使用したオンラインテーブル

ノートブックを新しいタブで開く

Databricks でオンライン テーブルを使用する モデルサービング

オンライン テーブルを使用して、Databricks モデルサービングの特徴を検索できます。 特徴量テーブルをオンライン テーブルに同期すると、その特徴量テーブルの特徴を使用してトレーニングされたモデルは、推論中にオンライン テーブルから特徴値を自動的に検索します。 追加の設定は必要ありません。

  1. FeatureLookupを使用してモデルをトレーニングする。

    モデル トレーニングでは、次の例に示すように、モデル トレーニング セットでオフラインの特徴量テーブルの特徴を使用します。

    training_set = fe.create_training_set(
      df=id_rt_feature_labels,
      label='quality',
      feature_lookups=[
          FeatureLookup(
              table_name="user_preferences",
              lookup_key="user_id"
          )
      ],
      exclude_columns=['user_id'],
    )
    
  2. Databricks モデルサービングを使用してモデルを提供します。 モデルは、オンライン テーブルから特徴を自動的に検索します。 詳細については、「 Databricks の MLflow モデルを使用した自動機能検索 」を参照してください。

ユーザー権限

オンライン テーブルを作成するには、次のアクセス許可が必要です。

  • SELECT ソース表に対する特権。

  • USE_CATALOG 宛先カタログに対する特権。

  • USE_SCHEMA 宛先スキーマに対する CREATE_TABLE 特権。

オンライン テーブルのデータ同期パイプラインを管理するには、オンライン テーブルの所有者であるか、オンライン テーブルに対する REFRESH 特権が付与されている必要があります。 カタログに対するUSE_CATALOGおよびUSE_SCHEMA権限を持たないユーザーには、カタログエクスプローラーにオンラインテーブルが表示されません。

Unity Catalog メタストアには、 特権モデル バージョン 1.0 が必要です。

エンドポイントのアクセス許可モデル

一意のシステム サービスプリンシパルは、データのクエリーと関数の実行に必要な制限されたアクセス許可を持つ、特徴量サービングまたはモデルサービングエンドポイントに対して自動的に作成されます。 このサービスプリンシパルにより、エンドポイントはリソースを作成したユーザーとは無関係にデータおよび関数リソースにアクセスでき、作成者がワークスペースを離れてもエンドポイントが引き続き機能できるようにします。

このシステム サービスプリンシパルの有効期間は、エンドポイントの有効期間です。 監査ログには、このシステムに必要な特権を付与する Unity Catalog カタログの所有者に対してシステムが生成したレコードが示されている場合があります サービスプリンシパル。

制限事項

  • ソース表ごとにサポートされるオンライン表は 1 つだけです。

  • オンライン テーブルとそのソース テーブルには最大 1000 列を含めることができます。

  • データ型 ARRAY、MAP、または STRUCT の列は、オンライン表の主キーとして使用できません。

  • オンライン テーブルで列を主キーとして使用する場合、ソース テーブルで列に NULL 値が含まれているすべての行が無視されます。

  • 外部テーブル、システムテーブル、および内部テーブルは、ソーステーブルとしてサポートされていません。

  • Delta チェンジデータフィードが有効になっていないソーステーブルは、スナップショット同期モードのみをサポートします。

  • Delta Sharing テーブルは、 スナップショット 同期モードでのみサポートされます。

  • オンライン表のカタログ名、スキーマ名、および表名には、英数字と下線のみを含めることができ、数字で始めることはできません。 ダッシュ (-) は使用できません。

  • 文字列型の列の長さは 64 KB に制限されています。

  • 列名の長さは 64 文字に制限されています。

  • 行の最大サイズは 2 MB です。

  • ゲート パブリック プレビュー中のオンライン テーブルの最大サイズは、200 GB の非圧縮ユーザー データです。

  • ゲート パブリック プレビュー中の Unity Catalog メタストア内のすべてのオンライン テーブルの合計サイズは、1 TB の非圧縮ユーザー データです。

  • 最大クエリ/秒 (QPS) は 200 です。 この制限は 25,000 以上に増やすことができます。 制限を増やすには、Databricks アカウント チームにお問い合わせください。

トラブルシューティング

「オンラインテーブルを作成」(Create online table) はカタログエクスプローラーに表示されません。

原因は、通常、同期元のテーブル (ソース テーブル) がサポートされている型ではないことです。 ソース テーブルの [セキュリティ保護可能な種類] (「カタログ エクスプローラ の詳細 」タブに表示) が、以下のサポートされているオプションのいずれかであることを確認します。

  • TABLE_EXTERNAL

  • TABLE_DELTA

  • TABLE_DELTA_EXTERNAL

  • TABLE_DELTASHARING

  • TABLE_DELTASHARING_MUTABLE

  • TABLE_STREAMING_LIVE_TABLE

  • TABLE_STANDARD

  • TABLE_FEATURE_STORE

  • TABLE_FEATURE_STORE_EXTERNAL

  • TABLE_VIEW

  • TABLE_VIEW_DELTASHARING

  • TABLE_MATERIALIZED_VIEW

オンラインテーブルの作成時に「トリガー」または「継続」の同期モードを選択できません。<a class="headerlink" href="#i-cannot-select-either-"triggered"-or-"continuous"-sync-modes-when-creating-an-online-table" title="この見出しへのパーマリンク">

これは、ソーステーブルで Delta チェンジデータフィードが有効になっていない場合、またはソーステーブルがビューまたはマテリアライズドビューである場合に発生します。 増分同期モードを使用するには、ソース テーブルでチェンジ データ フィードを有効にするか、ビュー以外のテーブルを使用します。

オンラインでのテーブル更新が失敗するか、ステータスがオフラインと表示される

このエラーのトラブルシューティングを開始するには、カタログ エクスプローラーのオンライン テーブルの[概要]タブに表示されるパイプライン ID をクリックします。

オンラインテーブルのパイプライン障害

表示されるパイプライン UI ページで、「フロー '__online_table を解決できませんでした」というエントリをクリックします。

オンライン テーブル パイプライン エラー メッセージ

ポップアップが表示され、[ エラーの詳細 ] セクションに詳細が表示されます。

オンラインテーブル エラーの詳細

エラーの一般的な原因には、次のようなものがあります。

  • オンライン テーブルの同期中に、ソース テーブルが削除されたか、削除されて同じ名前で再作成されました。 これは、連続したオンラインテーブルで常に同期しているため、特に一般的です。

  • ファイアウォールの設定により、サーバレスコンピュート経由でソーステーブルにアクセスできません。 この状況では、エラーの詳細セクションに「クラスター xxx で DLT サービスを開始できませんでした…」というエラー メッセージが表示される場合があります。

  • オンライン テーブルの集計サイズが、メタストア全体の 1 TiB (非圧縮サイズ) の制限を超えています。 1 TiB 制限は、 Deltaテーブルを行指向形式で拡張した後の非圧縮サイズを指します。 行形式のテーブルのサイズは、カタログ エクスプローラーに表示されるDeltaテーブルのサイズより大幅に大きくなる場合があります。これは、列指向形式のテーブルの圧縮サイズを指します。 この差は、テーブルの内容によっては 100 倍にもなります。

    Deltaテーブルの非圧縮の行展開サイズを見積もるには、サーバレス SQL ウェアハウスの次のクエリを使用します。 このクエリは、拡張されたテーブルの推定サイズをバイト単位で返します。 このクエリが正常に実行されると、サーバレス コンピュートがソース テーブルにアクセスできることも確認されます。

    SELECT sum(length(to_csv(struct(*)))) FROM `source_table`;