メインコンテンツまでスキップ

CREATE STREAMING TABLE (Lakeflow 宣言型パイプライン)

ストリーミングテーブル は、ストリーミングまたはインクリメンタルデータ処理をサポートするテーブルです。ストリーミングテーブルは Lakeflow 宣言型パイプラインによって支えられています。 ストリーミングテーブルが更新されるたびに、ソーステーブルに追加されたデータがストリーミングテーブルに追加されます。 ストリーミングテーブルは、手動またはスケジュールに従って更新できます。

更新を実行またはスケジュールする方法の詳細については、「Lakeflow 宣言型パイプラインで更新を実行する」を参照してください。

構文

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]

table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )

column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]

問題

  • REFRESH

    指定すると、テーブルが作成されるか、既存のテーブルとその内容が更新されます。

  • プライベート

    プライベートストリーミングテーブルを作成します。

    • これらはカタログに追加されず、定義パイプライン内でのみアクセスできます。
    • カタログ内の既存のオブジェクトと同じ名前を付けることができます。パイプライン内で、プライベート ストリーミング テーブルとカタログ内のオブジェクトが同じ名前を持つ場合、その名前への参照はプライベート ストリーミング テーブルに解決されます。
    • プライベート ストリーミング テーブルは、1 回の更新だけではなく、パイプラインの存続期間中のみ保持されます。

    プライベート ストリーミング テーブルは、以前にTEMPORARY問題を使用して作成されました。

  • テーブル名

    新しく作成されたテーブルの名前。完全修飾テーブル名は一意である必要があります。

  • テーブル仕様

    このオプション句は、列のリスト、そのタイプ、プロパティ、説明、および列の制約を定義します。

    • 列識別子

      列名は一意であり、クエリの出力列にマップされる必要があります。

    • 列タイプ

      列のデータ型を指定します。Databricksでサポートされているすべてのデータ型がストリーミング テーブルでサポートされているわけではありません。

    • 列コメント

      列を説明するオプションのSTRINGリテラル。このオプションはcolumn_typeと一緒に指定する必要があります。列タイプが指定されていない場合、列コメントはスキップされます。

    • 列制約

      テーブルに流入するデータを検証する制約を追加します。「パイプラインの期待値によるデータ品質の管理」を参照してください。

    • MASK句

備考

プレビュー

この機能は パブリック プレビュー段階です。

機密データを匿名化するための列マスク機能を追加します。

行フィルターと列マスクを参照してください。

  • テーブル制約
備考

プレビュー

この機能は パブリック プレビュー段階です。

スキーマを指定するときに、主キーと外部キーを定義できます。制約は情報提供であり、強制されるものではありません。SQL 言語リファレンスのCONSTRAINT 句を参照してください。

注記

テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインである必要があります。

  • テーブル句

    オプションで、テーブルのパーティション、コメント、およびユーザー定義のプロパティを指定します。各サブ句は 1 回だけ指定できます。

    • デルタの使用

      データ形式を指定します。唯一の選択肢はDELTAです。

      この句はオプションであり、デフォルトは DELTA になります。

    • パーティション分割

      テーブルのパーティション分割に使用する 1 つ以上の列のオプションのリスト。CLUSTER BYと相互に排他的です。

      リキッドクラスタリングは、クラスタリングのための柔軟で最適化されたソリューションを提供します。 宣言型パイプラインでは、PARTITIONED BY の代わりに CLUSTER BY を使用することを検討してくださいLakeflow。

    • クラスター BY

      テーブルで流動クラスタリングを有効にし、クラスタリング キーとして使用する列を定義します。 CLUSTER BY AUTOで自動流体クラスタリングを使用すると、 Databricksインテリジェントにクラスタリング キーを選択してクエリのパフォーマンスを最適化します。 PARTITIONED BYと相互に排他的です。

      テーブルにリキッドクラスタリングを使用するを参照してください。

    • 場所

      テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。

    • comment

      テーブルを説明するオプションのSTRINGリテラル。

    • テーブルプロパティ

      テーブルのテーブル プロパティのオプション リスト。

    • 行フィルター付き

備考

プレビュー

この機能は パブリック プレビュー段階です。

テーブルに行フィルター機能を追加します。そのテーブルに対する今後のクエリでは、関数が TRUE と評価された行のサブセットが返されます。これは、関数が呼び出し元ユーザーの ID とグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できるため、きめ細かなアクセス制御に役立ちます。

ROW FILTERを参照してください。

  • クエリー

    この句は、 queryのデータを使用してテーブルにデータを入力します。このクエリは ストリーミング クエリである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取り中に既存のレコードの変更または削除が検出されると、エラーがスローされます。静的ソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットを含むデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。

    querytable_specification一緒に指定する場合、 table_specificationで指定されたテーブル スキーマにはqueryによって返されるすべての列が含まれている必要があります。含まれていない場合は、エラーが発生します。table_specificationで指定されているがqueryによって返されない列は、クエリ時にnull値を返します。

    ストリーミング データの詳細については、 「パイプラインを使用したデータの変換」を参照してください。

必要な権限

パイプラインの実行ユーザーには、次の権限が必要です。

  • SELECT ストリーミング テーブルによって参照されるベース テーブルに対する権限。
  • USE CATALOG 親カタログに対する権限と親スキーマに対するUSE SCHEMA権限。
  • CREATE MATERIALIZED VIEW ストリーミングテーブルのスキーマに対する権限。

ユーザーがストリーミング テーブルが定義されているパイプラインを更新できるようにするには、次のものが必要です。

  • USE CATALOG 親カタログに対する権限と親スキーマに対するUSE SCHEMA権限。
  • ストリーミング テーブルの所有者、またはストリーミング テーブルに対するREFRESH権限。
  • ストリーミング テーブルの所有者は、ストリーミング テーブルによって参照されるベース テーブルに対するSELECT権限を持っている必要があります。

ユーザーが結果のストリーミング テーブルをクエリできるようにするには、次のものが必要です。

  • USE CATALOG 親カタログに対する権限と親スキーマに対するUSE SCHEMA権限。
  • SELECT ストリーミング テーブルに対する権限。

制限事項

  • テーブルの所有者のみがストリーミング テーブルを更新して最新のデータを取得できます。

  • ALTER TABLE コマンドはストリーミング テーブルでは禁止されています。 テーブルの定義とプロパティは、 CREATE OR REFRESHまたはALTER STREAMING TABLEステートメントを通じて変更する必要があります。

  • INSERT INTOMERGEなどの DML コマンドによるテーブル スキーマの展開はサポートされていません。

  • 次のコマンドはストリーミング テーブルではサポートされていません。

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • テーブルの名前変更や所有者の変更はサポートされていません。

  • ジェネレーテッドカラム、ID 列、およびデフォルト列はサポートされていません。

SQL
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;