CREATE STREAMING TABLE (DLT)
ストリーミングテーブル は、ストリーミングまたはインクリメンタルデータ処理をサポートするテーブルです。パイプライン ノートブックで定義されたストリーミングテーブルは、 DLT パイプラインによってサポートされます。 ストリーミングテーブルが更新されるたびに、ソーステーブルに追加されたデータがストリーミングテーブルに追加されます。 ストリーミングテーブルは、手動またはスケジュールに従って更新できます。
更新を実行またはスケジュールする方法の詳細については、「 DLT パイプラインで更新を実行する」を参照してください。
構文
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
パラメーター
-
REFRESH
指定した場合、テーブルを作成するか、既存のテーブルとその内容を更新します。
-
プライベート
プライベートストリーミングテーブルを作成します。
- カタログには追加されず、定義パイプライン内でのみアクセスできます
- カタログ内の既存のオブジェクトと同じ名前を持つことができます。パイプライン内で、プライベートストリーミングテーブルとカタログ内のオブジェクトの名前が同じ場合、名前への参照はプライベートストリーミングテーブルに解決されます。
- プライベートストリーミングテーブルは、1 回の更新だけでなく、パイプラインの有効期間全体にわたってのみ保持されます。
Private ストリーミングテーブルは、以前は
TEMPORARY
パラメーターを使用して作成されていました。 -
table_name
新しく作成されたテーブルの名前。完全修飾テーブル名は一意である必要があります。
-
table_specification
このオプションの句は、列のリスト、その型、プロパティ、説明、および列の制約を定義します。
-
列名は一意で、クエリの出力列にマップする必要があります。
-
列のデータ型を指定します。Databricksでサポートされているすべてのデータ型がストリーミングテーブルでサポートされているわけではありません。
-
column_comment
列を記述する省略可能な
STRING
リテラル。このオプションは、column_type
と共に指定する必要があります。列タイプが指定されていない場合、列コメントはスキップされます。 -
データがテーブルに流入するときにデータを検証する制約を追加します。「パイプラインの期待値を使用してデータ品質を管理する」を参照してください。
-
プレビュー
この機能は パブリック プレビュー段階です。
機密データを匿名化するための列マスク機能を追加します。
行フィルタと列マスクを使用した機密テーブル・データのフィルタを参照してください。
- table_constraint
プレビュー
この機能は パブリック プレビュー段階です。
スキーマを指定するときに、プライマリ・キーと外部キーを定義できます。制約は情報提供を目的としており、強制されません。SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
テーブルの制約を定義するには、パイプラインが Unity Catalog 対応パイプラインである必要があります。
-
table_clauses
オプションで、テーブルのパーティション化、コメント、およびユーザー定義プロパティを指定します。各 sub 句は一度だけ指定できます。
-
デルタの使用
データ形式を指定します。唯一のオプションは DELTA です。
この句はオプションで、デフォルトは DELTA です。
-
パーティション分割
テーブルでのパーティション分割に使用する 1 つ以上の列のオプションのリスト。
CLUSTER BY
と相互に排他的です。リキッドクラスタリングは、クラスタリングのための柔軟で最適化されたソリューションを提供します。 DLT には
PARTITIONED BY
の代わりにCLUSTER BY
を使用することを検討してください。 -
クラスター BY
テーブルでリキッドクラスタリングを有効にし、クラスタリングキーとして使用する列を定義します。
PARTITIONED BY
と相互に排他的です。「Deltaテーブルにリキッドクラスタリングを使用する」を参照してください。
-
場所
テーブルデータのオプションの格納場所。設定されていない場合、システムはデフォルトでパイプラインの保存場所を使用します。
-
comment
テーブルを記述する省略可能な
STRING
リテラル。 -
TBLプロパティ
テーブルの テーブルプロパティ のオプションのリスト。
-
行フィルター付き
-
プレビュー
この機能は パブリック プレビュー段階です。
テーブルに行フィルター関数を追加します。そのテーブルに対する今後のクエリは、関数が TRUE と評価される行のサブセットを受け取ります。これは、関数が呼び出し元のユーザーの ID とグループメンバーシップを検査して、特定の行をフィルタリングするかどうかを決定できるため、きめ細かなアクセス制御に役立ちます。
ROW FILTER
節を参照してください。
-
この句は、
query
のデータを使用してテーブルにデータを入力します。このクエリは ストリーミング クエリである必要があります。ストリーム キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードに対する変更または削除が検出されると、エラーがスローされます。静的なソースまたは追加専用のソースから読み取るのが最も安全です。変更コミットのあるデータを取り込むには、Python とSkipChangeCommits
オプションを使用してエラーを処理できます。query
とtable_specification
を一緒に指定する場合、table_specification
で指定するテーブルスキーマには、query
によって返されるすべてのカラムが含まれている必要があります。含まれていないと、エラーが発生します。table_specification
で指定されているが、query
によって返されない列は、クエリ時にnull
値を返します。ストリーミング データの詳細については、「 パイプラインを使用したデータの変換」を参照してください。
必要な権限
パイプラインの実行ユーザーには、次のアクセス許可が必要です。
SELECT
ストリーミングテーブルによって参照されるベーステーブルに対する権限。USE CATALOG
親カタログに対する権限と、親スキーマに対するUSE SCHEMA
権限。CREATE MATERIALIZED VIEW
ストリーミングテーブルのスキーマに対する権限。
ユーザーがストリーミングテーブルが定義されているパイプラインを更新できるようにするには、次のものが必要です。
USE CATALOG
親カタログに対する権限と、親スキーマに対するUSE SCHEMA
権限。- ストリーミングテーブルの所有権、またはストリーミングテーブルに対する
REFRESH
権限。 - ストリーミングテーブルの所有者は、ストリーミングテーブルが参照するベーステーブルに対する
SELECT
権限を持っている必要があります。
ユーザーが結果のストリーミングテーブルをクエリできるようにするには、次のものが必要です。
USE CATALOG
親カタログに対する権限と、親スキーマに対するUSE SCHEMA
権限。SELECT
ストリーミングテーブルに対する特権。
制限
-
テーブルの所有者のみがストリーミングテーブルを更新して最新のデータを取得できます。
-
ALTER TABLE
ストリーミングテーブルではコマンドは許可されません。テーブルの定義とプロパティは、CREATE OR REFRESH
または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。 -
INSERT INTO
やMERGE
などの DML コマンドを使用したテーブルスキーマの進化はサポートされていません。 -
次のコマンドは、ストリーミングテーブルではサポートされていません。
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
-
Delta Sharing はサポートされていません。
-
テーブルの名前変更や所有者の変更はサポートされていません。
-
ジェネレーテッドカラム、ID 列、およびデフォルト列はサポートされていません。
例
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;