Databricks SQLでストリーミングテーブルを使用する
プレビュー
この機能は パブリック プレビュー段階です。
Databricks 、 Databricks SQLを使用してデータを取り込むためにストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブル は、ストリーミングまたは増分データ処理の追加サポートを備えたUnity Catalogに登録されたテーブルです。 パイプラインはストリーミングテーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、 Kafkaおよびクラウド オブジェクト ストレージから増分データをロードできます。
Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法については、Delta テーブル ストリーミングの読み取りと書き込みを参照してください。
要件
ストリーミングテーブルを使用するには、以下の要件を満たす必要があります。
ワークスペースの要件 :
Databricks SQLで作成されたストリーミング テーブルは、サーバレス パイプラインによってサポートされています。 この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。
- サーバーレスが有効になっているDatabricksアカウント。 詳細については、 「サーバレスSQLウェアハウスを有効にする」を参照してください。
- Unity Catalog が有効になっているワークスペース。詳細については、 Unity Catalogの使用を開始する」を参照してください。
コンピュート要件 :
次のいずれかを使用する必要があります。
Currentチャンネルを使用するSQLウェアハウス。- Databricks Runtime 13.3 LTS以降の標準アクセス モード (以前の共有アクセス モード) でコンピュート。
権限要件 :
USE CATALOGおよびストリーミング テーブルを作成するカタログとスキーマに対するUSE SCHEMA権限。- ストリーミング テーブルを作成するスキーマに対する
CREATE TABLE権限。 - ストリーミング テーブルのソース データを提供するテーブルまたは場所にアクセスする権限。
ストリーミングテーブルの作成
ストリーミングテーブルは、Databricks SQLのSQLクエリによって定義されます。ストリーミングテーブルを作成すると、ソーステーブルに現在存在するデータを使用してストリーミングテーブルが作成されます。 その後、通常はスケジュールに従ってテーブルを更新し、ソース テーブルに追加されたデータをプルしてストリーミングテーブルに追加します。
ストリーミング テーブルを作成すると、テーブルの所有者とみなされます。
既存のテーブルからストリーミング テーブルを作成するには、次の例のようにCREATE STREAMING TABLEステートメントを使用します。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
この場合、ストリーミング テーブルsalesは、1 時間ごとに更新されるスケジュールで、 raw_dataテーブルの特定の列から作成されます。 使用するクエリは ストリーミング クエリである必要があります。ストリーミング セマンティクスを使用してソースから読み取るには、 STREAMキーワードを使用します。
CREATE OR REFRESH STREAMING TABLEステートメントを使用してストリーミング テーブルを作成すると、初期データの更新と作成がすぐに開始されます。 これらの操作では、DBSQL ウェアハウス コンピュートは消費されません。 代わりに、ストリーミング テーブルは作成と更新の両方をサーバレス パイプラインに依存します。 専用のサーバレス パイプラインは、ストリーミング テーブルごとにシステムによって自動的に作成され、管理されます。
Auto Loaderでファイルをロードする
ボリューム内のファイルからストリーミング テーブルを作成するには、 Auto Loader使用します。 クラウド オブジェクト ストレージからのほとんどのデータ取り込みタスクには、Auto Loader を使用します。Auto Loader とパイプラインは、クラウド ストレージに到着するデータが増え続けるにつれて、それを増分的かつべき等的にロードするように設計されています。
Databricks SQLでAuto Loaderを使用するには、read_files 関数を使用します。次の例は、 Auto Loader を使用して大量の JSON ファイルをストリーミングテーブルに読み込む方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
クラウド ストレージからデータを読み取るには、Auto Loader を使用することもできます。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'gs://mybucket/analysis/*/*/*.json',
format => "json"
);
Auto Loaderの詳細については、Auto Loaderとはを参照してください。Auto LoaderでのSQL の使用について、例を挙げて詳しく知りたい場合は、オブジェクトストレージからのデータの読み込み を参照してください。
他のソースからのストリーミング取り込み
Kafka を含む他のソースからの取り込みの例については、 「パイプラインでのデータのロード」を参照してください。
新しいデータのみを取り込む
デフォルトでは、 read_files関数はテーブルの作成中にソース ディレクトリ内の既存のデータをすべて読み取り、更新ごとに新しく到着するレコードを処理します。
テーブルの作成時にソース ディレクトリに既に存在するデータを取り込まないようにするには、 includeExistingFilesオプションをfalseに設定します。つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。例えば:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
ランタイムチャンネルを設定する
SQLウェアハウスで作成したストリーミングテーブルはパイプラインを使用して自動更新されます。 ラインパイプは当然currentチャンネルのランタイムを使用します。 リリース プロセスについては、 Lakeflow Spark宣言型パイプライン リリース ノートとリリース アップグレード プロセスを参照してください。
Databricks本番運用ワークロードにはcurrentチャンネルを使用することをおすすめします。 新機能は最初にpreviewチャンネルにリリースされます。 テーブル プロパティとしてpreview指定することで、パイプラインをプレビュー チャンネルに設定して新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルが作成された後に指定できます。
次のコード例は、CREATE ステートメントでプレビューするチャンネルを設定する方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
ストリーミングテーブルの更新をスケジュールする
Databricks SQLストリーミング テーブルを、定義されたスケジュールに基づいて自動的に更新するように構成したり、アップストリーム データが変更されたときにトリガーするように構成したりできます。
ベータ版
TRIGGER ON UPDATE機能はベータ版です。
スケジュールまたはトリガーを設定するには、次のいずれかを実行します。
- ストリーミング テーブルを作成するときに、
SCHEDULE句を使用してスケジュールを構成します。 - テーブルを作成するときに、
TRIGGER ON UPDATE句を使用してトリガーを構成します。 - ALTER STREAMING TABLEステートメントを使用して、スケジュールまたはトリガーを追加または変更します。
または、 CREATE OR REFRESH STREAMING TABLEまたはREFRESHステートメントのいずれかを含むジョブ内にタスクを作成し、他のジョブと同じようにオーケストレーションします。Lakeflowジョブを参照してください。
スケジュールを作成すると、Databricks は更新を処理するための新しいジョブを自動的に作成します。
スケジュールを表示するには、次のいずれかを実行します。
- Databricks UI の SQL エディターから
DESCRIBE EXTENDEDステートメントを実行します。「DESCRIBE TABLE」を参照してください。 - カタログ エクスプローラーを使用してストリーミング テーブルを表示します。 スケジュールは、 [概要] タブの [更新ステータス] に表示されます。「カタログ エクスプローラーとは何ですか?」を参照してください。
更新スケジュールがあっても、更新されたデータが必要な場合はいつでも手動で更新を実行できます。
機密データを非表示にする
ストリーミング テーブルを使用すると、テーブルにアクセスするユーザーから機密データを隠すことができます。 1 つの方法は、機密性の高い列または行を完全に除外するようにクエリを定義することです。あるいは、クエリを実行するユーザーの権限に基づいて、列マスクまたは行フィルターを適用することもできます。たとえば、グループHumanResourcesDeptに属していないユーザーに対してtax_id列を非表示にすることができます。これを行うには、ストリーミング テーブルの作成時にROW FILTERおよびMASK構文を使用します。 詳細については、 「行フィルターと列マスク」を参照してください。
ストリーミングテーブルの更新
ストリーミングテーブルの作成時に更新を自動的にスケジュールできます。 ストリーミング テーブルを手動で更新することもできます。 更新がスケジュールされている場合でも、いつでも手動更新を呼び出すことができます。更新は、ストリーミング テーブルとともに自動的に作成された同じパイプラインによって処理されます。
ストリーミング テーブルを更新するには:
REFRESH STREAMING TABLE sales;
DESCRIBE TABLE EXTENDEDで最新の更新のステータスを確認できます。
タイムトラベルクエリを使用する前に、ストリーミング テーブルを更新する必要がある場合があります。
更新の仕組み
ストリーミング テーブルの更新では、最後の更新以降に到着した新しい行のみが評価され、新しいデータのみが追加されます。
各更新では、ストリーミング テーブルの現在の定義を使用して、この新しいデータを処理します。 ストリーミング テーブル定義を変更しても、既存のデータは自動的に再計算されません。 変更が既存のデータと互換性がない場合は (たとえば、データ型の変更など)、次の更新はエラーで失敗します。
次の例は、ストリーミング テーブル定義への変更が更新動作にどのような影響を与えるかを説明しています。
- フィルターを削除しても、以前にフィルターされた行は再処理されません。
- 列の投影を変更しても、既存のデータの処理方法には影響しません。
- 静的スナップショットによる結合は、初期処理時のスナップショット状態を使用します。 更新されたスナップショットと一致する遅れて到着したデータは無視されます。これにより、ディメンションが遅れると、ファクトが削除される可能性があります。
- 既存の列の CAST を変更するとエラーが発生します。
既存のストリーミング テーブルでサポートできない方法でデータが変更された場合は、完全な更新を実行できます。
ストリーミングテーブルのフルリフレッシュ
完全更新では、ソースで利用可能なすべてのデータが最新の定義で再処理されます。完全な 更新 によって既存のデータが切り捨てられるため、データの履歴全体が保持されない、または保持期間が短い ソース ( Kafkaなど) で完全な 更新 を呼び出すことは推奨されません。 ソース内でデータが利用できなくなった場合、古いデータを回復できない可能性があります。
例えば:
REFRESH STREAMING TABLE sales FULL;
ストリーミングテーブルのスケジュールを変更する
ストリーミング テーブルの自動更新スケジュールを変更 (または設定) できます。 次の例は、 ALTER STREAMING TABLEを使用してスケジュールを設定する方法を示しています。
ALTER STREAMING TABLE sales
ADD SCHEDULE EVERY 1 HOUR;
-- Alters the schedule to refresh the streaming table when its upstream data
-- gets updated.
ALTER STREAMING TABLE sales
ALTER TRIGGER ON UPDATE;
更新スケジュール クエリの例については、 「ALTER STREAMING TABLE」を参照してください。
更新のステータスを追跡する
ストリーミング テーブルの更新ステータスを確認するには、パイプライン UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルのDESCRIBE EXTENDEDコマンドによって返される 更新情報 を表示します。
DESCRIBE TABLE EXTENDED <table-name>;
または、カタログエクスプローラ でストリーミングテーブルを表示し、そこで更新ステータスを確認することもできます。
- クリック
サイドバーの カタログ 。
- 左側の [カタログエクスプローラ] ツリーでカタログを開き、ストリーミングテーブルが配置されているスキーマを選択します。
- 選択したスキーマの下にある テーブル アイテムを開き、ストリーミングテーブルをクリックします。
ここから、ストリーミング テーブル名の下のタブを使用して、ストリーミング テーブルに関する以下の情報を表示および編集できます。
- ステータスと履歴を更新する
- テーブルスキーマ
- サンプルデータ (アクティブなコンピュートが必要です)
- 権限
- このストリーミングテーブルが依存するテーブルやパイプラインを含むリネージ
- 使い方を知る
- このストリーミングテーブル用に作成したモニター
更新のタイムアウト
長時間実行される更新はタイムアウトする可能性があります。2025 年 8 月 14 日以降に作成または更新されたストリーミング テーブルは、更新の実行に使用されたSQLウェアハウスに関連付けられたタイムアウトを使用します。 ウェアハウスにタイムアウトが設定されていない場合は、デフォルトの 2 日が使用されます。
ストリーミング テーブルは、 CREATE OR REFRESHステートメントを手動で実行する場合にのみタイムアウトを同期します。 スケジュールされた更新では、最新のCREATE OR REFRESHからのタイムアウトが保持されます。
更新の SQL でSTATEMENT_TIMEOUT構成を使用してタイムアウトを明示的に設定できます。STATEMENT_TIMEOUTを参照してください。
ストリーミングテーブルへのアクセスを制御する
ストリーミング テーブルは、潜在的なプライベート データの公開を回避しながら、データ共有をサポートするための豊富なアクセス制御をサポートしています。 ストリーミング テーブルの所有者またはMANAGE権限を持つユーザーは、他のユーザーにSELECT権限を付与できます。 ストリーミング テーブルへのSELECTアクセス権を持つユーザーは、ストリーミング テーブルによって参照されるテーブルへのSELECTアクセス権を必要としません。 このアクセス制御により、基盤となるデータへのアクセスを制御しながらデータ共有が可能になります。
ストリーミングテーブルに権限を付与する
ストリーミング テーブルへのアクセスを許可するには、 GRANTステートメントを使用します。
GRANT <privilege_type> ON <st_name> TO <principal>;
privilege_typeは次のいずれかになります。
SELECT- ユーザーはストリーミング テーブルをSELECTできます。REFRESH- ユーザーはストリーミング テーブルをREFRESHできます。 更新は所有者の権限を使用して実行されます。
次の例では、ストリーミング テーブルを作成し、ユーザーに選択権限と更新権限を付与します。
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Unity Catalogセキュリティ保護可能なオブジェクトに対する権限の付与の詳細については、 Unity Catalog権限とセキュリティ保護可能なオブジェクト」を参照してください。
ストリーミングテーブルから権限を取り消す
ストリーミング テーブルからのアクセスを取り消すには、 REVOKEステートメントを使用します。
REVOKE privilege_type ON <st_name> FROM principal;
ソース テーブルのSELECT権限が、ストリーミング テーブルの所有者、またはストリーミング テーブルでMANAGEまたはSELECT権限を付与されている他のユーザーから取り消された場合、またはソース テーブルが削除された場合でも、ストリーミング テーブルの所有者またはアクセスを許可されたユーザーは引き続きストリーミング テーブルをクエリできます。 ただし、次の動作が発生します。
- ストリーミング テーブルの所有者またはストリーミング テーブルにアクセスできなくなった他の人は、そのストリーミング テーブルを
REFRESHできなくなり、ストリーミング テーブルは無効になります。 - スケジュールを使用して自動化されている場合、次にスケジュールされている
REFRESH失敗するか、実行されません。
次の例では、 read_only_userからSELECT権限を取り消します。
REVOKE SELECT ON st_name FROM read_only_user;
ストリーミングテーブルからレコードを完全に削除する
プレビュー
ストリーミング テーブルでのREORGステートメントのサポートはパブリック プレビュー段階です。
- ストリーミング テーブルで
REORGステートメントを使用するには、 Databricks Runtime 15.4 以降が必要です。 REORGステートメントはどのストリーミング テーブルでも使用できますが、削除が有効になっているストリーミング テーブルからレコードを削除する場合にのみ必要です。 コマンドは、投下が有効になっていないストリーミング テーブルで使用した場合には効果がありません。
GDPRコンプライアンスなどの削除を有効にしたストリーミング テーブルの基盤となるストレージからレコードを物理的に削除するには、ストリーミング テーブルのデータに対してvacuum操作を確実に実行するための追加のステップを実行する必要があります。
基礎となるストレージからレコードを物理的に削除するには:
- ストリーミング テーブルのレコードを更新または削除します。
APPLY (PURGE)パラメーターを指定して、ストリーミング テーブルに対してREORGステートメントを実行します。 たとえばREORG TABLE <streaming-table-name> APPLY (PURGE);。- ストリーミングテーブルのデータ保持期間が経過するまで待ちます。 デフォルトのデータ保持期間は 7 日間ですが、
delta.deletedFileRetentionDurationテーブル プロパティを使用して構成できます。「タイムトラベルクエリのデータ保持を構成する」を参照してください。 REFRESHストリーミングテーブル。「ストリーミング テーブルの更新」を参照してください。REFRESH操作から 24 時間以内に、レコードが完全に削除されるようにするために必要なVACUUM操作を含むパイプライン メンテナンス タスクが自動的に実行されます。
クエリ履歴を使用して実行を監視する
クエリ履歴ページを使用すると、クエリの詳細とクエリ プロファイルにアクセスできます。これらは、パフォーマンスの悪いクエリや、ストリーミング テーブルの更新を実行するために使用されるパイプラインのボトルネックを特定するのに役立ちます。 クエリ履歴とクエリ プロファイルで利用できる情報の種類の概要については、 「クエリ履歴」と「クエリ プロファイル」を参照してください。
プレビュー
この機能はパブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。「Databricks プレビューの管理」を参照してください。
ストリーミング テーブルに関連するすべてのステートメントはクエリ履歴に表示されます。 ステートメント ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべてのCREATEステートメントの後には、パイプラインで非同期に実行されるREFRESHステートメントが続きます。REFRESHステートメントには通常、パフォーマンスの最適化に関する情報を提供する詳細なクエリ プランが含まれます。
クエリ履歴 UI でREFRESHステートメントにアクセスするには、次のステップを使用します。
- クリック
左側のサイドバーにある をクリックして、 書き込みー履歴 UIを開きます。
- ステートメント ドロップダウン・フィルターから REFRESH チェック・ボックスを選択します。
- クエリ ステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリックなどの概要の詳細が表示されます。
- クエリ プロファイルを開くには、[クエリ プロファイルを表示 ] をクリックします。クエリ プロファイルのナビゲートの詳細については、「クエリ プロファイル」を参照してください。
- 必要に応じて、 [クエリ ソース] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
SQLエディターのリンクを使用するか、 SQLウェアハウスに接続されているノートブックからクエリの詳細にアクセスすることもできます。
外部クライアントからストリーミングテーブルにアクセスする
オープンAPIsをサポートしていない外部のDelta LakeまたはIcebergクライアントからストリーミング テーブルにアクセスするには、互換Modeを使用できます。 互換Modeは、 Delta LakeまたはIcebergクライアントからアクセスできる読み取り専用バージョンのストリーミング テーブルが作成されます。