Databricks SQL のストリーミング テーブルを使用してデータを読み込む
プレビュー
この機能は パブリック プレビュー段階です。
Databricks では、ストリーミングテーブルを使用して Databricks SQLを使用してデータを取り込むことをお勧めします。 ストリーミングテーブル は、Unity Catalogに登録されたテーブルで、ストリーミングまたは増分データ処理の追加サポートがあります。DLT パイプラインは、ストリーミングテーブルごとに自動的に作成されます。 ストリーミングテーブルは、Kafka およびクラウドオブジェクトストレージからの増分データロードに使用できます。
この記事では、ストリーミング テーブルを使用して、Unity Catalog ボリューム (推奨) または外部ロケーションとして構成されたクラウドオブジェクトストレージからデータを読み込む方法について説明します。
Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法については、「 Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。
Databricks SQLで作成されたストリーミングテーブルは、サーバレス DLT パイプラインによって支えられています。この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。
始める前に
開始する前に、次の要件を満たす必要があります。
ワークスペースの要件 :
- サーバレスが有効になっている Databricks アカウント。 詳細については、 サーバレス SQLウェアハウスの有効化を参照してください。
- Unity Catalog が有効になっているワークスペース。 詳しくは、「Unity Catalogの設定と管理」を参照してください。
コンピュートの要件 :
次のいずれかを使用する必要があります。
Current
チャンネルを使った SQLウェアハウス。- Databricks Runtime 13.3 LTS 以上の標準アクセス モード (以前の共有アクセス モード) を使用したコンピュート。
権限の要件 :
- Unity Catalog外部ロケーションに対する
READ FILES
権限。情報については、クラウドストレージをDatabricksに接続するための外部ロケーションの作成を参照してください。 - ストリーミング・テーブルを作成するカタログに対する
USE CATALOG
権限。 - ストリーミングテーブルを作成するスキーマに対する
USE SCHEMA
権限。 - ストリーミングテーブルを作成するスキーマに対する
CREATE TABLE
権限。
その他の要件 :
-
ソース データへのパス。
ボリュームパスの例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
外部ロケーション パスの例:
gs://myBucket/analysis
この記事では、ロードするデータが、アクセス権を持つ Unity Catalog ボリュームまたは外部ロケーションに対応するクラウドストレージの場所にあることを前提としています。
ソースデータの検出とプレビュー
-
ワークスペースのサイドバーで、「 クエリ」 をクリックし、「 クエリを作成 」をクリックします。
-
クエリ エディターで、
Current
チャンネルを使用する SQLウェアハウスをドロップダウン リストから選択します。 -
次のコードをエディタに貼り付け、山括弧 (
<>
) 内の値をソースデータを識別する情報に置き換えて、[ 実行 ] をクリックします。
read_files
table valued 関数の実行時に、その関数のデフォルトがデータを解析できない場合、スキーマ推論エラーが発生する可能性があります。たとえば、複数行の CSV ファイルや JSON ファイルに対して複数行モードを設定する必要がある場合があります。 パーサー・オプションのリストについては、read_files
テーブル値関数を参照してください。
/* Discover your data in a volume */
LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
/* Preview your data in a volume */
SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
/* Discover your data in an external location */
LIST "gs://<bucket>/<path>/<folder>"
/* Preview your data in an external location */
SELECT * FROM read_files("gs://<bucket>/<path>/<folder>") LIMIT 10
ストリーミング テーブルへのデータの読み込み
クラウド・オブジェクト・ストレージ内のデータからストリーミング・テーブルを作成するには、以下をクエリ・エディターに貼り付けて、「 実行 」をクリックします。
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('gs://<bucket>/<path>/<folder>')
ランタイム チャンネルを設定する
SQLウェアハウスを使用して作成されたストリーミングテーブルは、DLT パイプラインを使用して自動的に更新されます。DLT パイプラインは、デフォルトによって current
チャンネルのランタイムを使用します。 リリースプロセスについては 、DLT リリースノートとリリースアップグレードプロセス を参照してください。
Databricks 本番運用ワークロードには、current
チャンネルを使用することをお勧めします。 新機能は最初に preview
チャンネルにリリースされます。 プレビュー DLT チャンネルにパイプラインを設定して、 preview
をテーブル プロパティとして指定することで、新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。
次のコード例は、CREATE ステートメントでチャンネルをプレビューに設定する方法を示しています。
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
DLT パイプラインを使用してストリーミング テーブルを更新する
このセクションでは、クエリで定義されたソースから利用可能な最新のデータでストリーミングテーブルを更新するためのパターンについて説明します。
ストリーミングテーブルを CREATE
または REFRESH
すると、更新はサーバレス DLT パイプラインを使用して処理されます。 定義する各ストリーミングテーブルには、DLT パイプラインが関連付けられています。
REFRESH
コマンドを実行すると、DLT パイプライン リンクが返されます。DLT パイプラインのリンクを使用して、更新のステータスを確認できます。
テーブルの所有者のみがストリーミング テーブルを更新して最新のデータを取得できます。 テーブルを作成したユーザーが所有者であり、所有者を変更することはできません。 タイムトラベルクエリを使用する前に、ストリーミングテーブルの更新が必要になる場合があります。
新しいデータのみを取り込む
デフォルトでは、 read_files
関数はテーブルの作成時にソースディレクトリ内のすべての既存のデータを読み取り、更新のたびに新しく到着したレコードを処理します。
テーブルの作成時にソースディレクトリにすでに存在するデータを取り込まないようにするには、 includeExistingFiles
オプションを false
に設定します。 つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。 例えば:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('gs://mybucket/analysis/*/*/*.json', includeExistingFiles => false)
ストリーミング テーブルを完全に更新する
完全更新 ソースで使用可能なすべてのデータを最新の定義で再処理します。 データの全履歴を保持しないソースや、保持期間が短いソース ( Kafkaなど) で完全更新を呼び出すことは、既存のデータが切り捨てられるため、お勧めしません。 ソースでデータが使用できなくなった場合、古いデータを回復できない場合があります。
例えば:
REFRESH STREAMING TABLE my_bronze_table FULL
ストリーミング テーブルの自動更新をスケジュールする
定義されたスケジュールに基づいて自動的に更新されるようにストリーミング テーブルを構成するには、クエリ エディターに次のものを貼り付け、[ 実行 ] をクリックします。
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
更新スケジュールクエリの例については、 ALTER STREAMING TABLEを参照してください。
更新の状態を追跡する
ストリーミングテーブルの更新のステータスを表示するには、DLT UI でストリーミングテーブルを管理するパイプラインを表示するか、ストリーミングテーブルの DESCRIBE EXTENDED
コマンドによって返される 更新情報 を表示します。
DESCRIBE EXTENDED <table-name>
Kafka からのストリーミング取り込み
Kafka からのストリーミング取り込みの例については、 read_kafkaを参照してください。
ストリーミングテーブルへのアクセス権をユーザーに付与する
ストリーミングテーブルに対する SELECT
権限をユーザーに付与してクエリを実行できるようにするには、以下をクエリエディタに貼り付けて、[ 実行 ] をクリックします。
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Unity Catalogセキュリティ保護可能なオブジェクトに対する特権の付与の詳細については、「特権とセキュリティ保護可能なオブジェクトUnity Catalog」を参照してください。
ストリーミング テーブルからレコードを完全に削除する
プレビュー
ストリーミング テーブルでの REORG
ステートメントのサポートは 、パブリック プレビュー段階です。
- ストリーミング テーブルで
REORG
ステートメントを使用するには、Databricks Runtime 15.4 以降が必要です。 REORG
ステートメントは任意のストリーミングテーブルで使用できますが、削除ベクトルが有効になっているストリーミングテーブルからレコードを削除する場合にのみ必要です。このコマンドは、削除ベクトルが有効になっていないストリーミングテーブルと一緒に使用しても効果がありません。
GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルの基盤となるストレージからレコードを物理的に削除するには、ストリーミングテーブルのデータに対してvacuum操作が実行されるように追加の手順を実行する必要があります。
以下では、これらの手順について詳しく説明します。
- レコードを更新するか、ストリーミング テーブルからレコードを削除します。
- ストリーミング・テーブルに対して
REORG
ステートメントを実行し、APPLY (PURGE)
パラメーターを指定します。 たとえば、REORG TABLE <streaming-table-name> APPLY (PURGE);
. - ストリーミング テーブルのデータ保持期間が経過するのを待ちます。 デフォルトのデータ保持期間は 7 日間ですが、
delta.deletedFileRetentionDuration
テーブル プロパティを使用して構成できます。 タイムトラベル クエリのデータ保持の構成を参照してください。 REFRESH
the ストリーミングテーブル. DLT パイプラインを使用したストリーミングテーブルの更新を参照してください。REFRESH
操作から24時間以内に、レコードを完全に削除するために必要なVACUUM
操作を含むDLTメンテナンスタスクが自動的に実行されます。DLT によって実行されるメンテナンス タスクを参照してください。
クエリ履歴を使用した実行の監視
クエリ履歴ページを使用して、クエリの詳細とクエリプロファイルにアクセスできるため、ストリーミングテーブルの更新を実行するために使用される DLT パイプラインのパフォーマンスの低いクエリやボトルネックを特定できます。クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 クエリ履歴 」と 「クエリ プロファイル」を参照してください。
プレビュー
この機能は パブリック プレビュー段階です。 ワークスペース管理者は、 プレビュー ページからこの機能を有効にできます。 「Databricks プレビューの管理」を参照してください。
ストリーミングテーブルに関連するすべてのステートメントがクエリ履歴に表示されます。 [ステートメント] ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべての CREATE
ステートメントの後には、DLT パイプラインで非同期に実行される REFRESH
ステートメントが続きます。通常、 REFRESH
ステートメントには、パフォーマンスの最適化に関する知見を提供する詳細なクエリ プランが含まれています。
クエリ履歴 UI で REFRESH
ステートメントにアクセスするには、次の手順を使用します。
- 左側のサイドバーで [
] をクリックして、 クエリー履歴 UI を開きます。
- 「ステートメント 」ドロップダウン・フィルターから「 REFRESH 」チェック・ボックスを選択します。
- クエリステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリクスなどの概要の詳細が表示されます。
- [ クエリ プロファイルの表示 ] をクリックして、クエリ プロファイルを開きます。 クエリ プロファイルのナビゲーションの詳細については、 クエリ プロファイル を参照してください。
- 必要に応じて、[ クエリ ソース ] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
また、 SQL エディターのリンクを使用して、または SQLウェアハウスに添付されたノートブックからクエリの詳細にアクセスすることもできます。