Databricks SQLでストリーミングテーブルを使用する
Databricks では、ストリーミングテーブルを使用して Databricks SQLを使用してデータを取り込むことをお勧めします。 ストリーミングテーブル は、Unity Catalogに登録されたテーブルで、ストリーミングまたは増分データ処理の追加サポートがあります。DLT パイプラインは、ストリーミングテーブルごとに自動的に作成されます。ストリーミングテーブルは、Kafka およびクラウドオブジェクトストレージからの増分データロードに使用できます。
Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法については、「 Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。
必要条件
ストリーミングテーブルを使用するには、次の要件を満たす必要があります。
ワークスペースの要件 :
Databricks SQLで作成されたストリーミングテーブルは、サーバレス DLT パイプラインによってサポートされます。この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。
- サーバレスが有効になっている Databricks アカウント。 詳細については、 サーバレス SQLウェアハウスの有効化を参照してください。
- Unity Catalog が有効になっているワークスペース。詳しくは、「Unity Catalogの設定と管理」を参照してください。
コンピュートの要件 :
次のいずれかを使用する必要があります。
-
Current
チャンネルを利用したSQLウェアハウスです。 -
Databricks Runtime 13.3 LTS 以上の標準アクセス モード (以前の共有アクセス モード) を使用したコンピュート。
-
Databricks Runtime 15.4 LTS 以上の専用アクセスモード(旧シングルユーザーアクセスモード)を搭載したコンピュート。
Databricks Runtime 15.3 以下では、専用のコンピュートを使用して 、他のユーザー が所有するストリーミングテーブルをクエリすることはできません。Databricks Runtime 15.3以下で専用のコンピュートを利用できるのは、ストリーミングテーブルを所有している場合のみです。テーブルの作成者が所有者です。
Databricks Runtime 15.4 LTS 以降では、テーブルの所有者でなくても、専用のコンピュートで DLT で生成されたテーブルのクエリがサポートされています。 専用のコンピュートを使用してデータ フィルタリング操作を実行すると、サーバレス コンピュート リソースの料金が請求される場合があります。 「専用コンピュート (旧称 single user コンピュート) のきめ細かなアクセス制御」を参照してください。
権限の要件 :
USE CATALOG
USE SCHEMA
ストリーミングテーブルを作成するカタログとスキーマに対する権限。- ストリーミングテーブルを作成するスキーマに対する
CREATE TABLE
権限。 - ストリーミングテーブルのソースデータを提供するテーブルまたは場所にアクセスするための権限。
Create ストリーミングテーブル
ストリーミングテーブルは、SQL Databricks SQLの クエリによって定義されます。ストリーミングテーブルを作成すると、ソーステーブルに現在存在するデータを使用してストリーミングテーブルが作成されます。 その後、通常はスケジュールに従ってテーブルを更新し、ソース テーブルに追加されたデータをプルしてストリーミングテーブルに追加します。
ストリーミングテーブルを作成すると、テーブルの所有者と見なされます。
既存のテーブルからストリーミングテーブルを作成するには、次の例のように CREATE STREAMING TABLE ステートメントを使用します。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
この場合、ストリーミングテーブル sales
は raw_data
テーブルの特定の列から作成され、1 時間ごとに更新するようにスケジュールされます。 使用するクエリは 、ストリーミング クエリである必要があります。STREAM
キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。
CREATE OR REFRESH STREAMING TABLE
ステートメントを使用してストリーミングテーブルを作成すると、初期データの更新とデータ作成がすぐに開始されます。これらの操作では、DBSQL ウェアハウス コンピュートは消費されません。 代わりに、ストリーミングテーブルは作成と更新の両方をサーバレス DLT に依存しています。 専用のサーバレス DLT パイプラインは、ストリーミングテーブルごとにシステムによって自動的に作成および管理されます。
Auto Loader でファイルを読み込む
ボリューム内のファイルからストリーミングテーブルを作成するには、 Auto Loaderを使用します。 Auto Loader with DLTT は、クラウド・オブジェクト・ストレージからのほとんどのデータ取り込みタスクに使用します。Auto Loader と DLT は、増え続けるデータがクラウド ストレージに到着するときに、増分的かつべき等に読み込むように設計されています。
Auto LoaderDatabricks SQLで を使用するには、read_files
関数を使用します。次の例は、 Auto Loader を使用して大量の JSON ファイルをストリーミングテーブルに読み込む方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
クラウドストレージからデータを読み取るには、次の Auto Loaderを使用することもできます。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
's3://mybucket/analysis/*/*/*.json',
format => "json"
);
Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。Auto LoaderでのSQL の使用について、例を挙げて詳しく知りたい場合は、オブジェクトストレージからのデータの読み込み を参照してください。
他のソースからのストリーミング取り込み
Kafka などの他のソースからのインジェストの例については、「 DLT を使用したデータのロード」を参照してください。
新しいデータのみを取り込む
デフォルトでは、 read_files
関数はテーブルの作成時にソースディレクトリ内のすべての既存のデータを読み取り、更新のたびに新しく到着したレコードを処理します。
テーブルの作成時にソースディレクトリにすでに存在するデータを取り込まないようにするには、 includeExistingFiles
オプションを false
に設定します。つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。例えば:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
ランタイム チャンネルを設定する
SQLウェアハウスを使用して作成されたストリーミングテーブルは、DLT パイプラインを使用して自動的に更新されます。DLT パイプラインは、 current
チャンネル by デフォルト のランタイムを使用します。 リリースプロセスについては 、DLT リリースノートとリリースアップグレードプロセス を参照してください。
Databricks 本番運用ワークロードには current
チャンネルを使用することをお勧めします。 新機能は最初に preview
チャンネルにリリースされます。 プレビュー DLT チャンネルにパイプラインを設定して、 preview
をテーブル プロパティとして指定することで、新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。
次のコード例は、CREATE ステートメントでチャンネルをプレビューに設定する方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
機密データを隠す
プレビュー
この機能は パブリック プレビュー段階です。
ストリーミングテーブルを使用すると、テーブルにアクセスするユーザーから機密データを隠すことができます。1 つの方法は、機密性の高い列または行を完全に除外するようにクエリを定義することです。または、クエリを実行するユーザーの権限に基づいて列マスクまたは行フィルターを適用することもできます。たとえば、 HumanResourcesDept
グループに属さないユーザーの tax_id
列を非表示にできます。これを行うには、ストリーミングテーブルの作成時に ROW FILTER
構文と MASK
構文を使用します。 詳細については、「 行フィルターと列マスクを使用した機密性の高いテーブル データのフィルター処理」を参照してください。
更新 a ストリーミングテーブル
更新は、ストリーミングテーブルの作成時に自動的にスケジュールできます。 ストリーミングテーブルを手動で更新することもできます。更新がスケジュールされている場合でも、いつでも手動更新を呼び出すことができます。更新は、ストリーミングテーブルと共に自動的に作成されたのと同じパイプラインによって処理されます。
ストリーミングテーブルを更新するには:
REFRESH STREAMING TABLE sales;
最新の更新状況は DESCRIBE TABLE EXTENDEDで確認できます。
テーブルの所有者のみがストリーミングテーブルを更新して最新のデータを取得できます。 テーブルを作成したユーザーが所有者であり、所有者を変更することはできません。タイムトラベル クエリを使用する前に、ストリーミングテーブルを更新する必要がある場合があります。
更新のしくみ
ストリーミングテーブルの更新では、前回の更新以降に到着した新しい行のみが評価され、新しいデータのみが追加されます。
各更新では、ストリーミングテーブルの現在の定義を使用して、この新しいデータを処理します。 ストリーミングテーブル定義を変更しても、既存のデータは自動的に再計算されません。 変更が既存のデータと互換性がない場合 (データ型の変更など)、次の更新はエラーで失敗します。
次の例は、ストリーミングテーブル定義の変更が更新動作にどのように影響するかを示しています。
- フィルターを削除しても、以前にフィルターされた行は再処理されません。
- 列の投影を変更しても、既存のデータの処理方法には影響しません。
- 静的スナップショットとの結合は、初期処理時のスナップショット状態を使用します。 更新されたスナップショットと一致する遅延到着データは無視されます。これにより、ディメンションが遅れると、ファクトがドロップされる可能性があります。
- 既存のカラムの CAST を変更すると、エラーが発生します。
既存のストリーミングテーブルでサポートできない方法でデータが変更された場合は、完全更新を実行できます。
Fully 更新 a ストリーミングテーブル
完全更新 ソースで使用可能なすべてのデータを最新の定義で再処理します。 データの全履歴を保持しないソースや、保持期間が短いソース ( Kafkaなど) で完全更新を呼び出すことは、既存のデータが切り捨てられるため、お勧めしません。 ソースでデータが使用できなくなった場合、古いデータを回復できない場合があります。
例えば:
REFRESH STREAMING TABLE sales FULL;
ストリーミングテーブルのスケジュールを変更する
ストリーミングテーブルの自動更新スケジュールを変更 (または設定) できます。次の例は、 ALTER STREAMING TABLE を使用してスケジュールを設定する方法を示しています。
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
更新スケジュールクエリの例については、 ALTER STREAMING TABLEを参照してください。
更新の状態を追跡する
ストリーミングテーブルの更新のステータスを表示するには、DLT UI でストリーミングテーブルを管理するパイプラインを表示するか、ストリーミングテーブルの DESCRIBE EXTENDED
コマンドによって返される 更新情報 を表示します。
DESCRIBE TABLE EXTENDED <table-name>;
または、Catalog Explorer でストリーミングテーブルを表示し、そこで更新ステータスを確認することもできます。
- サイドバー
カタログ をクリックします。
- 左側の [Catalog Explorer] ツリーでカタログを開き、ストリーミングテーブルが配置されているスキーマを選択します。
- 選択したスキーマの下にある [テーブル ] アイテムを開き、ストリーミングテーブルをクリックします。
ここから、ストリーミングテーブル名の下にあるタブを使用して、次のようなストリーミングテーブルに関する情報を表示および編集できます。
- 更新ステータスと履歴
- テーブルスキーマ
- サンプルデータ(アクティブなコンピュートが必要)
- 権限
- リネージ、このストリーミングテーブルが依存するテーブルとパイプラインを含む
- 使い方の知見
- このストリーミングテーブル用に作成したモニター
ストリーミングテーブルへのアクセスを制御する
ストリーミングテーブルは、プライベートなデータの公開を回避しながら、データ共有をサポートするための豊富なアクセス制御をサポートしています。 ストリーミングテーブルの所有者または MANAGE
権限を持つユーザーは、他のユーザーに SELECT
権限を付与できます。ストリーミングテーブルへの SELECT
アクセス権を持つユーザーは、ストリーミングテーブルによって参照されるテーブルへの SELECT
アクセス権は必要ありません。このアクセス制御により、基になるデータへのアクセスを制御しながら、データ共有が可能になります。
ストリーミングテーブルへの権限の付与
ストリーミングテーブルへのアクセスを許可するには、 GRANT ステートメントを使用します。
GRANT <privilege_type> ON <st_name> TO <principal>;
privilege_type
は次のとおりです。
SELECT
- ユーザーはストリーミングテーブルをSELECT
できます。REFRESH
- ユーザーはストリーミングテーブルをREFRESH
できます。更新は、所有者の権限を使用して実行されます。
次の例では、ストリーミングテーブルを作成し、ユーザーに選択権限と更新権限を付与します。
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Unity Catalogセキュリティ保護可能なオブジェクトに対する特権の付与の詳細については、「特権とセキュリティ保護可能なオブジェクトUnity Catalog」を参照してください。
ストリーミングテーブルからの権限の取り消し
ストリーミングテーブルからのアクセスを取り消すには、 REVOKE ステートメントを使用します:
REVOKE privilege_type ON <st_name> FROM principal;
ソーステーブルに対する SELECT
権限が、ストリーミングテーブルの所有者またはストリーミングテーブルに対する MANAGE
権限または SELECT
権限を付与された他のユーザーから取り消された場合、またはソーステーブルが削除された場合、ストリーミングテーブルの所有者またはアクセス権を付与されたユーザーは引き続きストリーミングテーブルをクエリできます。ただし、次の動作が発生します。
- ストリーミングテーブルの所有者またはストリーミングテーブルへのアクセスを失った他のユーザーは、そのストリーミングテーブルを
REFRESH
できなくなり、ストリーミングテーブルは古くなります。 - スケジュールを使用して自動化されている場合、次のスケジュール
REFRESH
失敗するか、実行されません。
次の例では、read_only_user
から SELECT
権限を取り消します。
REVOKE SELECT ON st_name FROM read_only_user;
ストリーミングテーブルからレコードを完全に削除する
プレビュー
ストリーミングテーブルでの REORG
ステートメントのサポートは 、パブリック プレビュー段階です。
- ストリーミングテーブルで
REORG
ステートメントを使用するには、 Databricks Runtime 15.4 以降が必要です。 REORG
ステートメントは任意のストリーミングテーブルで使用できますが、削除ベクトルが有効になっているストリーミングテーブルからレコードを削除する場合にのみ必要です。このコマンドは、削除ベクトルが有効になっていないストリーミングテーブルと一緒に使用しても効果がありません。
GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルの基になるストレージからレコードを物理的に削除するには、ストリーミングテーブルのデータに対してvacuum操作が実行されるように追加の手順を実行する必要があります。
基盤となるストレージからレコードを物理的に削除するには:
- レコードを更新するか、ストリーミングテーブルからレコードを削除します。
- ストリーミングテーブルに対して
REORG
ステートメントを実行し、APPLY (PURGE)
パラメーターを指定します。 たとえば、REORG TABLE <streaming-table-name> APPLY (PURGE);
. - ストリーミングテーブルのデータ保持期間が経過するのを待ちます。 デフォルトのデータ保持期間は 7 日間ですが、
delta.deletedFileRetentionDuration
テーブル プロパティを使用して構成できます。タイムトラベル クエリのデータ保持の構成を参照してください。 REFRESH
the ストリーミングテーブル. 「ストリーミングテーブルの更新」を参照してください。REFRESH
操作から24時間以内に、レコードを完全に削除するために必要なVACUUM
操作を含むDLTメンテナンスタスクが自動的に実行されます。
クエリ履歴を使用した実行の監視
クエリ履歴ページを使用して、クエリの詳細とクエリプロファイルにアクセスできるため、ストリーミングテーブルの更新を実行するために使用される DLT パイプラインのパフォーマンスの低いクエリやボトルネックを特定できます。クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 クエリ履歴 」と 「クエリ プロファイル」を参照してください。
プレビュー
この機能は パブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能を有効にできます。「Databricks プレビューの管理」を参照してください。
ストリーミングテーブルに関連するすべてのステートメントがクエリ履歴に表示されます。 [ステートメント] ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべての CREATE
ステートメントの後には、DLT パイプラインで非同期に実行される REFRESH
ステートメントが続きます。通常、 REFRESH
ステートメントには、パフォーマンスの最適化に関する知見を提供する詳細なクエリ プランが含まれています。
クエリ履歴 UI で REFRESH
ステートメントにアクセスするには、次の手順を使用します。
- 左側のサイドバーで [
] をクリックして、 クエリー履歴 UI を開きます。
- 「ステートメント 」ドロップダウン・フィルターから「 REFRESH 」チェック・ボックスを選択します。
- クエリステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリクスなどの概要の詳細が表示されます。
- [ クエリ プロファイルの表示 ] をクリックして、クエリ プロファイルを開きます。クエリ プロファイルのナビゲーションの詳細については、 クエリ プロファイル を参照してください。
- 必要に応じて、[ クエリ ソース ] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
また、 SQL エディターのリンクを使用して、または SQLウェアハウスに添付されたノートブックからクエリの詳細にアクセスすることもできます。