Databricks SQL のストリーミング テーブルを使用してデータを読み込む

プレビュー

この機能はパブリック プレビュー段階です。

Databricks では、ストリーミング テーブルを使用して Databricks SQL を使用してデータを取り込むことをお勧めします。 ストリーミング テーブル は、ストリーミングまたは増分データ処理の追加サポートを備えた Unity Catalog マネージド テーブルです。 DLT パイプラインは、ストリーミングテーブルごとに自動的に作成されます。 ストリーミングテーブルは、Kafka およびクラウドオブジェクトストレージからの増分データロードに使用できます。

この記事では、ストリーミング テーブルを使用して、Unity Catalog ボリューム (推奨) または外部ロケーションとして構成されたクラウド オブジェクト ストレージからデータを読み込む方法について説明します。

Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、「 Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。

始める前に

開始する前に、次のものがあることを確認してください。

  • サーバーレスが有効になっている Databricks アカウント。 詳細については、 「サーバレス SQL ウェアハウスを有効にする」を参照してください。

  • Unity Catalog が有効になっているワークスペース。 詳細については、「 Unity Catalog の設定と管理」を参照してください。

  • Current チャンネルを使用する SQLウェアハウス。

  • Delta Live Tablesパイプラインによって作成されたストリーミング テーブルをクエリするには、 Databricks Runtime 13.3 LTS以降を使用する共有パイプラインまたは SQL パイプラインを使用する必要があります。 Unity Catalog有効になっているパイプラインで作成されたストリーミング テーブルは、割り当てられたクラスターまたは分離されていないクラスターからはクエリできません。

  • Unity Catalogの外部ロケーションに対する READ FILES 権限。 詳細については、 「クラウド ストレージを Databricks に接続するための外部ロケーションを作成する」を参照してください。

  • ストリーミングテーブルを作成するカタログに対する USE CATALOG 権限。

  • ストリーミングテーブルを作成するスキーマに対する USE SCHEMA 権限。

  • ストリーミングテーブルを作成するスキーマに対する CREATE TABLE 権限。

  • ソース データへのパス。

    ボリューム・パスの例: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部ロケーションパスの例: s3://myBucket/analysis

    この記事では、読み込むデータが、Unity Catalog ボリュームまたはアクセスできる外部ロケーションに対応するクラウド ストレージの場所にあることを前提としています。

ソース データの 検出とプレビュー

  1. ワークスペースのサイドバーで、[ クエリー] をクリックし、[ クエリーの作成] をクリックします。

  2. クエリー エディターで、 Current チャンネルを使用する SQLウェアハウスをドロップダウン リストから選択します。

  3. 以下をエディターに貼り付け、ソース データを識別する情報を山かっこ (<>) で囲んだ値に置き換えて、[ 実行] をクリックします。

    read_files テーブル値関数の実行時に、関数のデフォルトがデータを解析できない場合、スキーマ推論エラーが発生することがあります。たとえば、複数行の CSV または JSON ファイルの複数行モードを構成する必要がある場合があります。 パーサー オプションの一覧については、「 テーブル値関数read_files」を参照してください。

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

ストリーミングテーブル へのデータのロード

クラウドオブジェクトストレージのデータからストリーミングテーブルを作成するには、クエリーエディタに次のコードを貼り付け、[ 実行] をクリックします。

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

DLT パイプライン を使用してストリーミングテーブルを更新する

このセクションでは、クエリで定義されたソースから入手できる最新のデータでストリーミングテーブルを更新するためのパターンについて説明します。

CREATE ストリーミング テーブルの操作では、ストリーミング テーブルへのデータの初期作成と読み込みに Databricks SQLウェアハウスを使用します。 ストリーミング テーブルのREFRESH 操作では、 Delta Live Tables (DLT) を使用します。 DLT パイプラインは、ストリーミングテーブルごとに自動的に作成されます。 ストリーミングテーブルが更新されると、更新を処理するために DLT パイプラインの更新が開始されます。

REFRESH コマンドを実行すると、DLT パイプライン リンクが返されます。DLT パイプライン リンクを使用して、更新の状態を確認できます。

テーブルの所有者のみがストリーミングテーブルを更新して最新のデータを取得できます。 テーブルを作成するユーザーが所有者であり、所有者を変更することはできません。

Delta Live Tablesとは」を参照してください。

新しいデータのみ を取り込む

デフォルトにより、 read_files 関数はテーブルの作成時にソースディレクトリ内のすべての既存のデータを読み取り、更新のたびに新しく到着したレコードを処理します。

テーブルの作成時にソース ディレクトリにすでに存在するデータが取り込まれないようにするには、 includeExistingFilesオプションをfalseに設定します。 つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。 例えば:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

ストリーミングテーブル を完全に更新する

完全更新では、ソースで使用可能なすべてのデータが最新の定義で再処理されます。 データの履歴全体を保持しないソースや保持期間が短いソース (Kafka など) では、完全更新によって既存のデータが切り捨てられるため、完全更新を呼び出すことはお勧めしません。 データがソースで使用できなくなった場合、古いデータを回復できない可能性があります。

例:

REFRESH STREAMING TABLE my_bronze_table FULL

ストリーミング テーブルの自動更新 をスケジュールする

定義されたスケジュールに基づいて自動的に更新するようにストリーミング テーブルを構成するには、以下をクエリー エディターに貼り付け、[ 実行] をクリックします。

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

更新スケジュールのクエリーの例については、「 ストリーミング テーブルの変更」を参照してください。

更新 の状態を追跡する

ストリーミング テーブルの更新の状態を表示するには、Delta Live Tables UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの DESCRIBE EXTENDED コマンドによって返される 更新情報 を表示します。

DESCRIBE EXTENDED <table-name>

Kafka からのストリーミング インジェスト

Kafka からのストリーミング インジェストの例については、「 read_kafka」を参照してください。

ストリーミングテーブル へのアクセス権をユーザーに付与する

ストリーミング テーブルに対する SELECT 権限をユーザーに付与してクエリーできるようにするには、クエリー エディターに以下を貼り付け、[ 実行] をクリックします。

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

セキュリティ保護可能なオブジェクトに対する特権の付与の詳細については、「 Unity Catalog セキュリティ保護 可能なオブジェクトUnity Catalog 」を参照してください。