メインコンテンツまでスキップ

CREATE STREAMING TABLE (DLT)

ストリーミングテーブル は、ストリーミングまたはインクリメンタルデータ処理をサポートするテーブルです。パイプライン ノートブックで定義されたストリーミングテーブルは、 DLT パイプラインによってサポートされます。 ストリーミングテーブルが更新されるたびに、ソーステーブルに追加されたデータがストリーミングテーブルに追加されます。 ストリーミングテーブルは、手動またはスケジュールに従って更新できます。

更新を実行またはスケジュールする方法の詳細については、「 DLT パイプラインで更新を実行する」を参照してください。

構文

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]

table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )

column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]

パラメーター

  • REFRESH

    指定した場合、テーブルを作成するか、既存のテーブルとその内容を更新します。

  • プライベート

    プライベートストリーミングテーブルを作成します。

    • カタログには追加されず、定義パイプライン内でのみアクセスできます
    • カタログ内の既存のオブジェクトと同じ名前を持つことができます。パイプライン内で、プライベートストリーミングテーブルとカタログ内のオブジェクトの名前が同じ場合、名前への参照はプライベートストリーミングテーブルに解決されます。
    • プライベートストリーミングテーブルは、1 回の更新だけでなく、パイプラインの有効期間全体にわたってのみ保持されます。

    Private ストリーミングテーブルは、以前は TEMPORARY パラメーターを使用して作成されていました。

  • table_name

    新しく作成されたテーブルの名前。完全修飾テーブル名は一意である必要があります。

  • table_specification

    このオプションの句は、列のリスト、その型、プロパティ、説明、および列の制約を定義します。

    • column_identifier

      列名は一意で、クエリの出力列にマップする必要があります。

    • column_type

      列のデータ型を指定します。Databricksでサポートされているすべてのデータ型がストリーミングテーブルでサポートされているわけではありません。

    • column_comment

      列を記述する省略可能な STRING リテラル。このオプションは、 column_typeと共に指定する必要があります。列タイプが指定されていない場合、列コメントはスキップされます。

    • column_constraint

      データがテーブルに流入するときにデータを検証する制約を追加します。「パイプラインの期待値を使用してデータ品質を管理する」を参照してください。

    • MASK 句

備考

プレビュー

この機能は パブリック プレビュー段階です。

機密データを匿名化するための列マスク機能を追加します。

行フィルタと列マスクを使用した機密テーブル・データのフィルタを参照してください。

  • table_constraint
備考

プレビュー

この機能は パブリック プレビュー段階です。

スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。制約は情報提供を目的としており、強制されません。SQL 言語リファレンスの CONSTRAINT 句 を参照してください。

注記

テーブルの制約を定義するには、パイプラインが Unity Catalog 対応パイプラインである必要があります。

  • table_clauses

    オプションで、テーブルのパーティション化、コメント、およびユーザー定義プロパティを指定します。各 sub 句は一度だけ指定できます。

    • デルタの使用

      データ形式を指定します。唯一のオプションは DELTA です。

      この句はオプションで、デフォルトは DELTA です。

    • パーティション分割

      テーブルでのパーティション分割に使用する 1 つ以上の列のオプションのリスト。CLUSTER BYと相互に排他的です。

      リキッドクラスタリングは、クラスタリングのための柔軟で最適化されたソリューションを提供します。 DLT には PARTITIONED BY の代わりに CLUSTER BY を使用することを検討してください。

    • クラスター BY

      テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。PARTITIONED BYと相互に排他的です。

      Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。

    • 場所

      テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。

    • comment

      テーブルを記述する省略可能な STRING リテラル。

    • TBLプロパティ

      テーブルの テーブルプロパティ のオプションのリスト。

    • 行フィルター付き

備考

プレビュー

この機能は パブリック プレビュー段階です。

テーブルに行フィルター関数を追加します。そのテーブルに対する今後のクエリは、関数が TRUE と評価される行のサブセットを受け取ります。これは、関数が呼び出し元のユーザーの ID とグループメンバーシップを検査して、特定の行をフィルタリングするかどうかを決定できるため、きめ細かなアクセス制御に役立ちます。

ROW FILTERを参照してください。

  • クエリー

    この句は、 queryのデータを使用してテーブルにデータを入力します。このクエリは ストリーミング クエリである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python と SkipChangeCommits オプションを使用してエラーを処理できます。

    querytable_specification を一緒に指定する場合、 table_specification で指定するテーブルスキーマには、queryによって返されるすべてのカラムが含まれている必要があります。含まれていないと、エラーが発生します。table_specification で指定されているが、query によって返されない列は、クエリ時に null 値を返します。

    ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。

必要な権限

パイプラインの実行ユーザーには、次のアクセス許可が必要です。

  • SELECT ストリーミングテーブルによって参照されるベーステーブルに対する権限。
  • USE CATALOG 親カタログに対する権限と、親スキーマに対する USE SCHEMA 権限。
  • CREATE MATERIALIZED VIEW ストリーミングテーブルのスキーマに対する権限。

ユーザーがストリーミングテーブルが定義されているパイプラインを更新できるようにするには、次のものが必要です。

  • USE CATALOG 親カタログに対する権限と、親スキーマに対する USE SCHEMA 権限。
  • ストリーミングテーブルの所有権、またはストリーミングテーブルに対する REFRESH 権限。
  • ストリーミングテーブルの所有者は、ストリーミングテーブルが参照するベーステーブルに対する SELECT 権限を持っている必要があります。

ユーザーが結果のストリーミングテーブルをクエリできるようにするには、次のものが必要です。

  • USE CATALOG 親カタログに対する権限と、親スキーマに対する USE SCHEMA 権限。
  • SELECT ストリーミングテーブルに対する特権。

制限

  • テーブルの所有者のみがストリーミングテーブルを更新して最新のデータを取得できます。

  • ALTER TABLE ストリーミングテーブルではコマンドは許可されません。テーブルの定義とプロパティは、 CREATE OR REFRESH または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。

  • INSERT INTOMERGE などの DML コマンドを使用したテーブルスキーマの進化はサポートされていません。

  • 次のコマンドは、ストリーミングテーブルではサポートされていません。

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing はサポートされていません。

  • テーブルの名前変更や所有者の変更はサポートされていません。

  • ジェネレーテッドカラム、ID 列、およびデフォルト列はサポートされていません。

SQL
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;