メインコンテンツまでスキップ

Databricks SQL のストリーミング テーブルを使用してデータを読み込む

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks では、ストリーミングテーブルを使用して Databricks SQLを使用してデータを取り込むことをお勧めします。 ストリーミングテーブル は、Unity Catalogに登録されたテーブルで、ストリーミングまたは増分データ処理の追加サポートがあります。DLT パイプラインは、ストリーミングテーブルごとに自動的に作成されます。 ストリーミングテーブルは、Kafka およびクラウドオブジェクトストレージからの増分データロードに使用できます。

この記事では、ストリーミング テーブルを使用して、Unity Catalog ボリューム (推奨) または外部ロケーションとして構成されたクラウドオブジェクトストレージからデータを読み込む方法について説明します。

注記

Delta Lake テーブルをストリーミング ソースとシンクとして使用する方法については、「 Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。

important

Databricks SQLで作成されたストリーミングテーブルは、サーバレス DLT パイプラインによって支えられています。この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。

始める前に

開始する前に、次の要件を満たす必要があります。

ワークスペースの要件 :

コンピュートの要件 :

次のいずれかを使用する必要があります。

  • Currentチャンネルを使った SQLウェアハウス。
  • Databricks Runtime 13.3 LTS 以上の標準アクセス モード (以前の共有アクセス モード) を使用したコンピュート。

権限の要件 :

  • Unity Catalog外部ロケーションに対するREAD FILES権限。情報については、クラウドストレージをDatabricksに接続するための外部ロケーションの作成を参照してください。
  • ストリーミング・テーブルを作成するカタログに対する USE CATALOG 権限。
  • ストリーミングテーブルを作成するスキーマに対する USE SCHEMA 権限。
  • ストリーミングテーブルを作成するスキーマに対する CREATE TABLE 権限。

その他の要件 :

  • ソース データへのパス。

    ボリュームパスの例: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部ロケーション パスの例: gs://myBucket/analysis

注記

この記事では、ロードするデータが、アクセス権を持つ Unity Catalog ボリュームまたは外部ロケーションに対応するクラウドストレージの場所にあることを前提としています。

ソースデータの検出とプレビュー

  1. ワークスペースのサイドバーで、「 クエリ」 をクリックし、「 クエリを作成 」をクリックします。

  2. クエリ エディターで、Current チャンネルを使用する SQLウェアハウスをドロップダウン リストから選択します。

  3. 次のコードをエディタに貼り付け、山括弧 (<>) 内の値をソースデータを識別する情報に置き換えて、[ 実行 ] をクリックします。

注記

read_files table valued 関数の実行時に、その関数のデフォルトがデータを解析できない場合、スキーマ推論エラーが発生する可能性があります。たとえば、複数行の CSV ファイルや JSON ファイルに対して複数行モードを設定する必要がある場合があります。 パーサー・オプションのリストについてはread_filesテーブル値関数を参照してください。

SQL
/* Discover your data in a volume */
LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"

/* Preview your data in a volume */
SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10

/* Discover your data in an external location */
LIST "gs://<bucket>/<path>/<folder>"

/* Preview your data in an external location */
SELECT * FROM read_files("gs://<bucket>/<path>/<folder>") LIMIT 10

ストリーミング テーブルへのデータの読み込み

クラウド・オブジェクト・ストレージ内のデータからストリーミング・テーブルを作成するには、以下をクエリ・エディターに貼り付けて、「 実行 」をクリックします。

SQL
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('gs://<bucket>/<path>/<folder>')

ランタイム チャンネルを設定する

SQLウェアハウスを使用して作成されたストリーミングテーブルは、DLT パイプラインを使用して自動的に更新されます。DLT パイプラインは、デフォルトによって current チャンネルのランタイムを使用します。 リリースプロセスについては 、DLT リリースノートとリリースアップグレードプロセス を参照してください。

Databricks 本番運用ワークロードには、current チャンネルを使用することをお勧めします。 新機能は最初に preview チャンネルにリリースされます。 プレビュー DLT チャンネルにパイプラインを設定して、 preview をテーブル プロパティとして指定することで、新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。

次のコード例は、CREATE ステートメントでチャンネルをプレビューに設定する方法を示しています。

SQL
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)

DLT パイプラインを使用してストリーミング テーブルを更新する

このセクションでは、クエリで定義されたソースから利用可能な最新のデータでストリーミングテーブルを更新するためのパターンについて説明します。

ストリーミングテーブルを CREATE または REFRESH すると、更新はサーバレス DLT パイプラインを使用して処理されます。 定義する各ストリーミングテーブルには、DLT パイプラインが関連付けられています。

REFRESH コマンドを実行すると、DLT パイプライン リンクが返されます。DLT パイプラインのリンクを使用して、更新のステータスを確認できます。

注記

テーブルの所有者のみがストリーミング テーブルを更新して最新のデータを取得できます。 テーブルを作成したユーザーが所有者であり、所有者を変更することはできません。 タイムトラベルクエリを使用する前に、ストリーミングテーブルの更新が必要になる場合があります。

DLTとはを参照してください。

新しいデータのみを取り込む

デフォルトでは、 read_files 関数はテーブルの作成時にソースディレクトリ内のすべての既存のデータを読み取り、更新のたびに新しく到着したレコードを処理します。

テーブルの作成時にソースディレクトリにすでに存在するデータを取り込まないようにするには、 includeExistingFiles オプションを falseに設定します。 つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。 例えば:

SQL
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('gs://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

ストリーミング テーブルを完全に更新する

完全更新 ソースで使用可能なすべてのデータを最新の定義で再処理します。 データの全履歴を保持しないソースや、保持期間が短いソース ( Kafkaなど) で完全更新を呼び出すことは、既存のデータが切り捨てられるため、お勧めしません。 ソースでデータが使用できなくなった場合、古いデータを回復できない場合があります。

例えば:

SQL
REFRESH STREAMING TABLE my_bronze_table FULL

ストリーミング テーブルの自動更新をスケジュールする

定義されたスケジュールに基づいて自動的に更新されるようにストリーミング テーブルを構成するには、クエリ エディターに次のものを貼り付け、[ 実行 ] をクリックします。

SQL
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];

更新スケジュールクエリの例については、 ALTER STREAMING TABLEを参照してください。

更新の状態を追跡する

ストリーミングテーブルの更新のステータスを表示するには、DLT UI でストリーミングテーブルを管理するパイプラインを表示するか、ストリーミングテーブルの DESCRIBE EXTENDED コマンドによって返される 更新情報 を表示します。

SQL
DESCRIBE EXTENDED <table-name>

Kafka からのストリーミング取り込み

Kafka からのストリーミング取り込みの例については、 read_kafkaを参照してください。

ストリーミングテーブルへのアクセス権をユーザーに付与する

ストリーミングテーブルに対する SELECT 権限をユーザーに付与してクエリを実行できるようにするには、以下をクエリエディタに貼り付けて、[ 実行 ] をクリックします。

SQL
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Unity Catalogセキュリティ保護可能なオブジェクトに対する特権の付与の詳細については、「特権とセキュリティ保護可能なオブジェクトUnity Catalog」を参照してください。

ストリーミング テーブルからレコードを完全に削除する

備考

プレビュー

ストリーミング テーブルでの REORG ステートメントのサポートは 、パブリック プレビュー段階です。

注記
  • ストリーミング テーブルで REORG ステートメントを使用するには、Databricks Runtime 15.4 以降が必要です。
  • REORG ステートメントは任意のストリーミングテーブルで使用できますが、削除ベクトルが有効になっているストリーミングテーブルからレコードを削除する場合にのみ必要です。このコマンドは、削除ベクトルが有効になっていないストリーミングテーブルと一緒に使用しても効果がありません。

GDPR コンプライアンスなど、削除ベクトルが有効になっているストリーミングテーブルの基盤となるストレージからレコードを物理的に削除するには、ストリーミングテーブルのデータに対してvacuum操作が実行されるように追加の手順を実行する必要があります。

以下では、これらの手順について詳しく説明します。

  1. レコードを更新するか、ストリーミング テーブルからレコードを削除します。
  2. ストリーミング・テーブルに対して REORG ステートメントを実行し、 APPLY (PURGE) パラメーターを指定します。 たとえば、 REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. ストリーミング テーブルのデータ保持期間が経過するのを待ちます。 デフォルトのデータ保持期間は 7 日間ですが、 delta.deletedFileRetentionDuration テーブル プロパティを使用して構成できます。 タイムトラベル クエリのデータ保持の構成を参照してください。
  4. REFRESH the ストリーミングテーブル. DLT パイプラインを使用したストリーミングテーブルの更新を参照してください。REFRESH操作から24時間以内に、レコードを完全に削除するために必要なVACUUM操作を含むDLTメンテナンスタスクが自動的に実行されます。DLT によって実行されるメンテナンス タスクを参照してください。

クエリ履歴を使用した実行の監視

クエリ履歴ページを使用して、クエリの詳細とクエリプロファイルにアクセスできるため、ストリーミングテーブルの更新を実行するために使用される DLT パイプラインのパフォーマンスの低いクエリやボトルネックを特定できます。クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 クエリ履歴 」と 「クエリ プロファイル」を参照してください。

備考

プレビュー

この機能は パブリック プレビュー段階です。 ワークスペース管理者は、 プレビュー ページからこの機能を有効にできます。 「Databricks プレビューの管理」を参照してください。

ストリーミングテーブルに関連するすべてのステートメントがクエリ履歴に表示されます。 [ステートメント] ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべての CREATE ステートメントの後には、DLT パイプラインで非同期に実行される REFRESH ステートメントが続きます。通常、 REFRESH ステートメントには、パフォーマンスの最適化に関する知見を提供する詳細なクエリ プランが含まれています。

クエリ履歴 UI で REFRESH ステートメントにアクセスするには、次の手順を使用します。

  1. 左側のサイドバーで [ 履歴アイコン ] をクリックして、 クエリー履歴 UI を開きます。
  2. 「ステートメント 」ドロップダウン・フィルターから「 REFRESH 」チェック・ボックスを選択します。
  3. クエリステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリクスなどの概要の詳細が表示されます。
  4. [ クエリ プロファイルの表示 ] をクリックして、クエリ プロファイルを開きます。 クエリ プロファイルのナビゲーションの詳細については、 クエリ プロファイル を参照してください。
  5. 必要に応じて、[ クエリ ソース ] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。

また、 SQL エディターのリンクを使用して、または SQLウェアハウスに添付されたノートブックからクエリの詳細にアクセスすることもできます。

追加のリソース