Databricks SQLのストリーミング テーブルを使用してデータを読み込む

プレビュー

この機能はパブリックプレビュー段階です。

Databricks では、Databricks SQL を使用してデータを取り込むためにストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Unity Catalog 管理テーブルです。 Delta Live Tables パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、Kafka およびクラウド オブジェクト ストレージからの増分データ ロードを行うことができます。

この記事では、ストリーミング テーブルを使用して、Unity Catalog ボリューム (推奨) または外部ロケーションとして構成されたクラウドオブジェクトストレージからデータを読み込む方法について説明します。

注:

Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、 「Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。

始める前に

開始する前に、次のものがあることを確認してください。

  • サーバーレスが有効になっているDatabricks 。 詳細については、 「サーバーレスSQLウェアハウスの有効化」を参照してください。

  • Unity Catalog が有効になっているワークスペース。 詳細については、「 Unity Catalog の設定と管理」を参照してください。

  • Current チャンネルを使用するSQLウェアハウス。

  • Delta Live Tablesパイプラインによって作成されたストリーミング テーブルをクエリするには、 Databricks Runtime 13.3 LTS以降を使用する共有パイプラインまたはSQLパイプラインを使用する必要があります。 Unity Catalog有効になっているパイプラインで作成されたストリーミング テーブルは、割り当てられたクラスターまたは分離されていないクラスターからはクエリできません。

  • Unity Catalog外部ロケーションに対する READ FILES 権限。 詳細については、 「クラウド ストレージをDatabricksに接続するための外部ロケーションの作成」を参照してください。

  • ストリーミング テーブルを作成するカタログに対するUSE CATALOG権限。

  • ストリーミング テーブルを作成するスキーマに対するUSE SCHEMA権限。

  • ストリーミング テーブルを作成するスキーマに対するCREATE TABLE権限。

  • ソース データへのパス。

    ボリューム・パスの例: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部ロケーションパスの例: s3://myBucket/analysis

    注:

    この記事では、読み込むデータが、Unity Catalog ボリュームまたはアクセスできる外部ロケーションに対応するクラウド ストレージの場所にあることを前提としています。

ソース データの検出とプレビュー

  1. ワークスペースのサイドバーで、 「クエリ」をクリックし、 「クエリの作成」をクリックします。

  2. クエリ エディターで、ドロップダウン リストから Current チャンネルを使用するSQLウェアハウスを選択します。

  3. 次の内容をエディターに貼り付け、ソース データを識別する情報の代わりに山括弧 ( <> ) 内の値を使用して、 [実行]をクリックします。

    注:

    関数のデフォルトがデータを解析できない場合、 read_filesテーブル値関数を実行するとスキーマ推論エラーが発生する可能性があります。 たとえば、複数行の CSV または JSON ファイルの場合は、複数行モードを構成する必要がある場合があります。 パーサー オプションの一覧については、「 テーブル値関数read_files」を参照してください。

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

ストリーミングテーブルへのデータのロード

クラウド オブジェクト ストレージのデータからストリーミング テーブルを作成するには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

DLT パイプラインを使用してストリーミングテーブルを更新する

このセクションでは、クエリで定義されたソースから入手可能な最新のデータを使用してストリーミング テーブルを更新するためのパターンについて説明します。

CREATE ストリーミング テーブルの操作では、ストリーミング テーブルへのデータの初期作成と読み込みに Databricks SQL ウェアハウスが使用されます。 ストリーミング テーブルのREFRESH操作では Delta Live Tables (DLT) が使用されます。 ストリーミング テーブルごとに DLT パイプラインが自動的に作成されます。 ストリーミング テーブルが更新されると、更新を処理するために DLT パイプラインの更新が開始されます。

REFRESHコマンドを実行すると、DLT パイプライン リンクが返されます。 DLT パイプライン リンクを使用して、更新のステータスを確認できます。

注:

ストリーミング テーブルを更新して最新のデータを取得できるのは、テーブル所有者だけです。 テーブルを作成したユーザーが所有者であり、所有者は変更できません。

Delta Live Tables とは」を参照してください。

新しいデータのみを取り込む

デフォルトでは、 read_files関数はテーブルの作成中にソース ディレクトリ内の既存のデータをすべて読み取り、更新ごとに新しく到着するレコードを処理します。

テーブルの作成時にソース ディレクトリに既に存在するデータを取り込まないようにするには、 includeExistingFilesオプションをfalseに設定します。 つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。 例えば:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

ストリーミングテーブルを完全に更新する

完全更新では、ソースで使用可能なすべてのデータが最新の定義で再処理されます。 データの履歴全体を保持しない、または保持期間が短いソース ( Kafkaなど) で完全な更新を呼び出すことは、完全な更新によって既存のデータが切り捨てられるため、推奨されません。 ソースでデータが利用できなくなった場合、古いデータを回復できない可能性があります。

例:

REFRESH STREAMING TABLE my_bronze_table FULL

ストリーミング テーブルの自動更新をスケジュールする

定義されたスケジュールに基づいてストリーミング テーブルが自動的に更新されるように構成するには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

更新スケジュール クエリの例については、 「ALTER STREAMING TABLE」を参照してください。

更新の状態を追跡する

ストリーミング テーブルの更新のステータスを確認するには、 Delta Live Tables UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの DESCRIBE EXTENDED コマンドによって返される更新情報を表示します。

DESCRIBE EXTENDED <table-name>

Kafkaからのストリーミング インジェスト

Kafka からのストリーミング取り込みの例については、 read_kafkaを参照してください。

ストリーミングテーブルへのアクセス権をユーザーに付与する

ユーザーにストリーミング テーブルに対するSELECT権限を付与してクエリを実行できるようにするには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Unity Catalogのセキュリティ保護可能なオブジェクトに対する権限の付与の詳細については、 Unity Catalog権限とセキュリティ保護可能なオブジェクト」を参照してください。