メインコンテンツまでスキップ

Unity Catalog と構造化ストリーミングの併用

Unity Catalogを使用した構造化ストリーミングを使用して、Databricks上の増分およびストリーミングワークロードのデータガバナンスを管理します。このドキュメントでは、サポートされる機能の概要を説明し、Unity Catalogと構造化ストリーミングを併用するためのベストプラクティスを提案します。

Unity Catalog はどのような構造化ストリーミング機能をサポートしていますか?

Unity Catalog では、Databricks で使用できる構造化ストリーミング ソースとシンクに明示的な制限は追加されません。Unity Catalog データ ガバナンス モデルを使用すると、 Unity Catalogのマネージド テーブルと外部テーブルからデータをストリーミングできます。Unity Catalog によって管理される外部ロケーションを使用して、オブジェクト・ストレージURIを使用してデータを操作することもできます。外部テーブルに書き込むには、テーブル名またはファイル パスを使用します。テーブル名を使用して、 Unity Catalog 上のマネージ テーブルを操作する必要があります。

構造化ストリーミング チェックポイントのパスを指定する場合は、 Unity Catalog が管理する外部ロケーションを使用します。 Unity Catalog を使用してストレージを安全に接続する方法の詳細については、「 Unity Catalog を使用してクラウド オブジェクト ストレージに接続する」を参照してください。

Unity Catalog でサポートされていない構造化ストリーミング機能は何ですか?

Apache Spark の連続処理モードはサポートされていません。 Spark 構造化ストリーミング プログラミング ガイドの 連続処理 を参照してください。

コンピュート アクセス モードに基づく Unity Catalog でサポートされていない構造化ストリーミング機能の一覧については、「 ストリーミングの制限 事項」および「 専用コンピュートのストリーミングとマテリアライズドビューの要件」を参照してください。

Unity Catalogビューをストリームとして読み取る

Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Unity Catalog に登録されているビューからストリーミング読み取りを実行できます。 Databricks では、Delta テーブルに対して定義されたビューからの読み取りのストリーミングのみがサポートされています。

構造化ストリーミングでビューを読み取るには、次の例のように、ビューの識別子を .table() メソッドに指定します。

Python
df = (spark.readStream
.table("demoView")
)

ユーザーは、ターゲット ビューに対する SELECT 権限を持っている必要があります。

ビューに対するストリーミング読み取りを構成するためのサポートされているオプション

ビューに対するストリーミング読み取りを構成する場合は、次のオプションがサポートされています。

  • maxFilesPerTrigger
  • maxBytesPerTrigger
  • ignoreDeletes
  • skipChangeCommits
  • withEventTimeOrder
  • startingTimestamp
  • startingVersion

ストリーミング リーダーは、基になる Delta テーブルを定義するファイルとメタデータにこれらのオプションを適用します。

important

UNION ALL で定義されたビューに対する読み取りは、オプション withEventTimeOrderstartingVersionをサポートしません。

ソース ビューでサポートされている操作

すべてのビューがストリーミング読み取りをサポートしているわけではありません。 ソース ビューでサポートされていない操作には、集計と並べ替えがあります。

次のリストは、サポートされている操作の説明とビュー定義の例を示しています。

  • プロジェクト

    • 説明: 列レベルの権限を制御します。

    • 演算子: SELECT... FROM...

    • ステートメントの例:

      SQL
      CREATE VIEW project_view AS
      SELECT id, value
      FROM source_table
  • フィルタ

    • 説明: 行レベルの権限を制御します

    • 演算子: WHERE...

    • ステートメントの例:

      SQL
      CREATE VIEW filter_view AS
      SELECT * FROM source_table
      WHERE value > 100
  • ユニオンオール

    • 説明: Results from multiple tables

    • 演算子: UNION ALL

    • ステートメントの例:

      SQL
      CREATE VIEW union_view AS
      SELECT id, value FROM source_table1
      UNION ALL
      SELECT * FROM source_table2
注記

ビュー定義を変更して、ビューで参照されるテーブルを追加または変更し、同じストリーミング・チェックポイントを使用することはできません。

制限

次の制限が適用されます:

  • ストリームできるのは、 Delta テーブルに支えられたビューからのみです。 他のデータソースに対して定義されたビューはサポートされていません。

  • Unity Catalog にビューを登録する必要があります。

  • サポートされていない演算子を持つビューからストリームする場合は、次の例外が表示されます。

    UnsupportedOperationException: [UNEXPECTED_OPERATOR_IN_STREAMING_VIEW] Unexpected operator <operator> in the CREATE VIEW statement as a streaming source. A streaming view query must consist only of SELECT, WHERE, and UNION ALL operations.
  • サポートされていないオプションを指定した場合は、次の例外が表示されます。

    AnalysisException: [UNSUPPORTED_STREAMING_OPTIONS_FOR_VIEW.UNSUPPORTED_OPTION] Unsupported for streaming a view. Reason: option <option> is not supported.