Databricks SQLでストリーミングテーブルを使用する
プレビュー
この機能は パブリック プレビュー段階です。
Databricks 、 Databricks SQLを使用してデータを取り込むためにストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブル は、ストリーミングまたは増分データ処理の追加サポートを備えたUnity Catalogに登録されたテーブルです。 パイプラインはストリーミングテーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、 Kafkaおよびクラウド オブジェクト ストレージから増分データをロードできます。
Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法については、Delta テーブル ストリーミングの読み取りと書き込みを参照してください。
要件
ストリーミングテーブルを使用するには、以下の要件を満たす必要があります。
ワークスペースの要件 :
Databricks SQLで作成されたストリーミングテーブルは、サーバレス Lakeflow 宣言型パイプラインによって支えられています。この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。
- サーバーレスが有効になっているDatabricksアカウント。 詳細については、 「サーバレスSQLウェアハウスを有効にする」を参照してください。
- Unity Catalog が有効になっているワークスペース。詳細については、 Unity Catalogの使用を開始する」を参照してください。
コンピュート要件 :
次のいずれかを使用する必要があります。
Current
チャンネルを使用するSQLウェアハウス。- Databricks Runtime 13.3 LTS以降の標準アクセス モード (以前の共有アクセス モード) でコンピュート。
権限要件 :
USE CATALOG
およびストリーミング テーブルを作成するカタログとスキーマに対するUSE SCHEMA
権限。- ストリーミング テーブルを作成するスキーマに対する
CREATE TABLE
権限。 - ストリーミング テーブルのソース データを提供するテーブルまたは場所にアクセスする権限。
ストリーミングテーブルの作成
ストリーミングテーブルは、Databricks SQLのSQLクエリによって定義されます。ストリーミングテーブルを作成すると、ソーステーブルに現在存在するデータを使用してストリーミングテーブルが作成されます。 その後、通常はスケジュールに従ってテーブルを更新し、ソース テーブルに追加されたデータをプルしてストリーミングテーブルに追加します。
ストリーミング テーブルを作成すると、テーブルの所有者とみなされます。
既存のテーブルからストリーミング テーブルを作成するには、次の例のようにCREATE STREAMING TABLEステートメントを使用します。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
この場合、ストリーミング テーブルsales
は、1 時間ごとに更新されるスケジュールで、 raw_data
テーブルの特定の列から作成されます。 使用するクエリは ストリーミング クエリである必要があります。ストリーミング セマンティクスを使用してソースから読み取るには、 STREAM
キーワードを使用します。
CREATE OR REFRESH STREAMING TABLE
ステートメントを使用してストリーミングテーブルを作成すると、初期データの更新とデータ作成がすぐに開始されます。これらの操作では、DBSQL ウェアハウス コンピュートは消費されません。 代わりに、ストリーミングテーブルは、作成と更新の両方をサーバレス Lakeflow 宣言型パイプラインに依存しています。 専用のサーバレス パイプラインは、ストリーミングテーブルごとにシステムによって自動的に作成および管理されます。
Auto Loaderでファイルをロードする
ボリューム内のファイルからストリーミングテーブルを作成するには、 Auto Loaderを使用します。 Auto Loader とLakeflow 宣言型パイプラインを併用して、クラウド オブジェクト ストレージからのほとんどのデータ取り込みタスクに対応します。Auto Loader と Lakeflow 宣言型パイプラインは、増え続けるデータがクラウド ストレージに到着するときに、増分的かつべき等に読み込むように設計されています。
Databricks SQLでAuto Loaderを使用するには、read_files
関数を使用します。次の例は、 Auto Loader を使用して大量の JSON ファイルをストリーミングテーブルに読み込む方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
クラウド ストレージからデータを読み取るには、Auto Loader を使用することもできます。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'gs://mybucket/analysis/*/*/*.json',
format => "json"
);
Auto Loaderの詳細については、Auto Loaderとはを参照してください。Auto LoaderでのSQL の使用について、例を挙げて詳しく知りたい場合は、オブジェクトストレージからのデータの読み込み を参照してください。
他のソースからのストリーミング取り込み
他のソース (Kafkaを含む) からの取り込みの例については、Lakeflow宣言型パイプラインを使用したデータのロードを参照してください。
新しいデータのみを取り込む
デフォルトでは、 read_files
関数はテーブルの作成中にソース ディレクトリ内の既存のデータをすべて読み取り、更新ごとに新しく到着するレコードを処理します。
テーブルの作成時にソース ディレクトリに既に存在するデータを取り込まないようにするには、 includeExistingFiles
オプションをfalse
に設定します。つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。例えば:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
ランタイムチャンネルを設定する
SQLウェアハウスを使用して作成されたストリーミングテーブルは、パイプラインを使用して自動的に更新されます。Lakeflow 宣言型パイプラインは、 current
チャンネル by デフォルト のランタイムを使用します。 リリース プロセスの詳細については Lakeflow 宣言型パイプライン リリースノート と リリース アップグレード プロセス を参照してください。
Databricksでは本番運用ワークロードには current
チャンネルを使用することをお勧めします。 新機能は最初に preview
チャンネルにリリースされます。 プレビュー Lakeflow 宣言型パイプライン チャンネルにパイプラインを設定して、 preview
をテーブル プロパティとして指定することで新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。
次のコード例は、CREATE ステートメントでプレビューするチャンネルを設定する方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
ストリーミングテーブルの更新をスケジュールする
Databricks SQLストリーミング テーブルを、定義されたスケジュールに基づいて自動的に更新するように構成したり、アップストリーム データが変更されたときにトリガーするように構成したりできます。
ベータ版
TRIGGER ON UPDATE
機能はベータ版です。ワークスペースでこの機能を有効にするには、Databricks の担当者に問い合わせてください。
スケジュールまたはトリガーを設定するには、次のいずれかを実行します。
- ストリーミング テーブルを作成するときに、
SCHEDULE
句を使用してスケジュールを構成します。 - テーブルを作成するときに、
TRIGGER ON UPDATE
句を使用してトリガーを構成します。 - ALTER STREAMING TABLEステートメントを使用して、スケジュールまたはトリガーを追加または変更します。
または、 CREATE OR REFRESH STREAMING TABLE
またはREFRESH
ステートメントのいずれかを含むジョブ内にタスクを作成し、他のジョブと同じようにオーケストレーションします。LakeFlowジョブを参照してください。
スケジュールを作成すると、Databricks は更新を処理するための新しいジョブを自動的に作成します。
スケジュールを表示するには、次のいずれかを実行します。
- Databricks UI の SQL エディターから
DESCRIBE EXTENDED
ステートメントを実行します。「DESCRIBE TABLE」を参照してください。 - カタログ エクスプローラーを使用してストリーミング テーブルを表示します。 スケジュールは、 [概要] タブの [更新ステータス] に表示されます。「カタログ エクスプローラーとは何ですか?」を参照してください。
更新スケジュールがあっても、更新されたデータが必要な場合はいつでも手動で更新を実行できます。
機密データを非表示にする
プレビュー
この機能は パブリック プレビュー段階です。
ストリーミング テーブルを使用すると、テーブルにアクセスするユーザーから機密データを隠すことができます。 1 つの方法は、機密性の高い列または行を完全に除外するようにクエリを定義することです。あるいは、クエリを実行するユーザーの権限に基づいて、列マスクまたは行フィルターを適用することもできます。たとえば、グループHumanResourcesDept
に属していないユーザーに対してtax_id
列を非表示にすることができます。これを行うには、ストリーミング テーブルの作成時にROW FILTER
およびMASK
構文を使用します。 詳細については、 「行フィルターと列マスク」を参照してください。
ストリーミングテーブルの更新
ストリーミングテーブルの作成時に更新を自動的にスケジュールできます。 ストリーミング テーブルを手動で更新することもできます。 更新がスケジュールされている場合でも、いつでも手動更新を呼び出すことができます。更新は、ストリーミング テーブルとともに自動的に作成された同じパイプラインによって処理されます。
ストリーミング テーブルを更新するには:
REFRESH STREAMING TABLE sales;
DESCRIBE TABLE EXTENDEDで最新の更新のステータスを確認できます。
テーブル所有者のみがストリーミング テーブルを更新して最新のデータを取得できます。 テーブルを作成したユーザーが所有者となり、所有者を変更することはできません。タイムトラベルクエリを使用する前に、ストリーミング テーブルを更新する必要がある場合があります。
更新の仕組み
ストリーミング テーブルの更新では、最後の更新以降に到着した新しい行のみが評価され、新しいデータのみが追加されます。
各更新では、ストリーミング テーブルの現在の定義を使用して、この新しいデータを処理します。 ストリーミング テーブル定義を変更しても、既存のデータは自動的に再計算されません。 変更が既存のデータと互換性がない場合は (たとえば、データ型の変更など)、次の更新はエラーで失敗します。
次の例は、ストリーミング テーブル定義への変更が更新動作にどのような影響を与えるかを説明しています。
- フィルターを削除しても、以前にフィルターされた行は再処理されません。
- 列の投影を変更しても、既存のデータの処理方法には影響しません。
- 静的スナップショットによる結合は、初期処理時のスナップショット状態を使用します。 更新されたスナップショットと一致する遅れて到着したデータは無視されます。これにより、ディメンションが遅れると、ファクトが削除される可能性があります。
- 既存の列の CAST を変更するとエラーが発生します。
既存のストリーミング テーブルでサポートできない方法でデータが変更された場合は、完全な更新を実行できます。
ストリーミングテーブルのフルリフレッシュ
完全更新では、ソースで利用可能なすべてのデータが最新の定義で再処理されます。完全な 更新 によって既存のデータが切り捨てられるため、データの履歴全体が保持されない、または保持期間が短い ソース ( Kafkaなど) で完全な 更新 を呼び出すことは推奨されません。 ソース内でデータが利用できなくなった場合、古いデータを回復できない可能性があります。
例えば:
REFRESH STREAMING TABLE sales FULL;
ストリーミングテーブルのスケジュールを変更する
ストリーミング テーブルの自動更新スケジュールを変更 (または設定) できます。 次の例は、 ALTER STREAMING TABLEを使用してスケジュールを設定する方法を示しています。
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
更新スケジュール クエリの例については、 「ALTER STREAMING TABLE」を参照してください。
更新のステータスを追跡する
ストリーミングテーブルの更新のステータスを表示するには、Lakeflow宣言型パイプライン UI でストリーミングテーブルを管理するパイプラインを表示するか、ストリーミングテーブルの DESCRIBE EXTENDED
コマンドによって返される 更新情報 を表示します。
DESCRIBE TABLE EXTENDED <table-name>;
または、カタログエクスプローラ でストリーミングテーブルを表示し、そこで更新ステータスを確認することもできます。
- クリック
サイドバーの カタログ 。
- 左側の [カタログエクスプローラ] ツリーでカタログを開き、ストリーミングテーブルが配置されているスキーマを選択します。
- 選択したスキーマの下にある テーブル アイテムを開き、ストリーミングテーブルをクリックします。
ここから、ストリーミング テーブル名の下のタブを使用して、ストリーミング テーブルに関する以下の情報を表示および編集できます。
- ステータスと履歴を更新する
- テーブルスキーマ
- サンプルデータ (アクティブなコンピュートが必要です)
- 権限
- このストリーミングテーブルが依存するテーブルやパイプラインを含むリネージ
- 使い方を知る
- このストリーミングテーブル用に作成したモニター
更新のタイムアウト
長時間実行される更新はタイムアウトする可能性があります。2025 年 8 月 14 日以降に作成または更新されたストリーミング テーブルは、更新の実行に使用されたSQLウェアハウスに関連付けられたタイムアウトを使用します。 ウェアハウスにタイムアウトが設定されていない場合は、デフォルトの 2 日が使用されます。
ストリーミング テーブルは、 CREATE OR REFRESH
ステートメントを手動で実行する場合にのみタイムアウトを同期します。 スケジュールされた更新では、最新のCREATE OR REFRESH
からのタイムアウトが保持されます。
更新の SQL でSTATEMENT_TIMEOUT
構成を使用してタイムアウトを明示的に設定できます。STATEMENT_TIMEOUTを参照してください。
ストリーミングテーブルへのアクセスを制御する
ストリーミング テーブルは、潜在的なプライベート データの公開を回避しながら、データ共有をサポートするための豊富なアクセス制御をサポートしています。 ストリーミング テーブルの所有者またはMANAGE
権限を持つユーザーは、他のユーザーにSELECT
権限を付与できます。 ストリーミング テーブルへのSELECT
アクセス権を持つユーザーは、ストリーミング テーブルによって参照されるテーブルへのSELECT
アクセス権を必要としません。 このアクセス制御により、基盤となるデータへのアクセスを制御しながらデータ共有が可能になります。
ストリーミングテーブルに権限を付与する
ストリーミング テーブルへのアクセスを許可するには、 GRANTステートメントを使用します。
GRANT <privilege_type> ON <st_name> TO <principal>;
privilege_type
は次のいずれかになります。
SELECT
- ユーザーはストリーミング テーブルをSELECT
できます。REFRESH
- ユーザーはストリーミング テーブルをREFRESH
できます。 更新は所有者の権限を使用して実行されます。
次の例では、ストリーミング テーブルを作成し、ユーザーに選択権限と更新権限を付与します。
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Unity Catalogセキュリティ保護可能なオブジェクトに対する権限の付与の詳細については、 Unity Catalog権限とセキュリティ保護可能なオブジェクト」を参照してください。
ストリーミングテーブルから権限を取り消す
ストリーミング テーブルからのアクセスを取り消すには、 REVOKEステートメントを使用します。
REVOKE privilege_type ON <st_name> FROM principal;
ソース テーブルのSELECT
権限が、ストリーミング テーブルの所有者、またはストリーミング テーブルでMANAGE
またはSELECT
権限を付与されている他のユーザーから取り消された場合、またはソース テーブルが削除された場合でも、ストリーミング テーブルの所有者またはアクセスを許可されたユーザーは引き続きストリーミング テーブルをクエリできます。 ただし、次の動作が発生します。
- ストリーミング テーブルの所有者またはストリーミング テーブルにアクセスできなくなった他の人は、そのストリーミング テーブルを
REFRESH
できなくなり、ストリーミング テーブルは無効になります。 - スケジュールを使用して自動化されている場合、次にスケジュールされている
REFRESH
失敗するか、実行されません。
次の例では、 read_only_user
からSELECT
権限を取り消します。
REVOKE SELECT ON st_name FROM read_only_user;
ストリーミングテーブルからレコードを完全に削除する
プレビュー
ストリーミング テーブルでのREORG
ステートメントのサポートはパブリック プレビュー段階です。
- ストリーミング テーブルで
REORG
ステートメントを使用するには、 Databricks Runtime 15.4 以降が必要です。 REORG
ステートメントはどのストリーミング テーブルでも使用できますが、削除が有効になっているストリーミング テーブルからレコードを削除する場合にのみ必要です。 コマンドは、投下が有効になっていないストリーミング テーブルで使用した場合には効果がありません。
GDPRコンプライアンスなどの削除を有効にしたストリーミング テーブルの基盤となるストレージからレコードを物理的に削除するには、ストリーミング テーブルのデータに対してvacuum操作を確実に実行するための追加のステップを実行する必要があります。
基礎となるストレージからレコードを物理的に削除するには:
- ストリーミング テーブルのレコードを更新または削除します。
APPLY (PURGE)
問題を指定して、ストリーミング テーブルに対してREORG
ステートメントを実行します。 たとえばREORG TABLE <streaming-table-name> APPLY (PURGE);
。- ストリーミングテーブルのデータ保持期間が経過するまで待ちます。 デフォルトのデータ保持期間は 7 日間ですが、
delta.deletedFileRetentionDuration
テーブル プロパティを使用して構成できます。「タイムトラベルクエリのデータ保持を構成する」を参照してください。 REFRESH
ストリーミングテーブル。ストリーミングテーブルの更新を参照してください。REFRESH
操作から 24 時間以内に、レコードが完全に削除されるようにするために必要なVACUUM
操作を含む、Lakeflow宣言型パイプライン メンテナンス タスクが自動的に実行されます。
クエリ履歴を使用して実行を監視する
クエリ履歴ページを使用して、クエリの詳細とクエリ プロファイルにアクセスできるため、ストリーミングテーブルの更新を実行するために使用される Lakeflow 宣言型パイプラインでパフォーマンスの低いクエリやボトルネックを特定するのに役立ちます。 クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、クエリ履歴 と クエリ プロファイルを参照してください。
プレビュー
この機能はパブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。「Databricks プレビューの管理」を参照してください。
ストリーミング テーブルに関連するすべてのステートメントはクエリ履歴に表示されます。 ステートメント ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべてのCREATE
ステートメントの後には、パイプラインで非同期に実行されるREFRESH
ステートメントが続きます。REFRESH
ステートメントには通常、パフォーマンスの最適化に関する情報を提供する詳細なクエリ プランが含まれます。
クエリ履歴 UI でREFRESH
ステートメントにアクセスするには、次のステップを使用します。
- クリック
左側のサイドバーにある をクリックして、 書き込みー履歴 UIを開きます。
- ステートメント ドロップダウン・フィルターから REFRESH チェック・ボックスを選択します。
- クエリ ステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリックなどの概要の詳細が表示されます。
- クエリ プロファイルを開くには、[クエリ プロファイルを表示 ] をクリックします。クエリ プロファイルのナビゲートの詳細については、「クエリ プロファイル」を参照してください。
- 必要に応じて、 [クエリ ソース] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
SQLエディターのリンクを使用するか、 SQLウェアハウスに接続されているノートブックからクエリの詳細にアクセスすることもできます。