Databricks SQLのストリーミング テーブルを使用してデータを読み込む

Databricks では、ストリーミング テーブルを使用して Databricks SQL を使用してデータを取り込むことをお勧めします。 ストリーミング テーブルは、Unity Catalog に登録されたテーブルで、ストリーミングまたは増分データ処理の追加サポートがあります。Delta Live Tables パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミングテーブルは、Kafka およびクラウドオブジェクトストレージからの増分データロードに使用できます。

この記事では、ストリーミング テーブルを使用して、Unity Catalog ボリューム (推奨) または外部ロケーションとして構成されたクラウドオブジェクトストレージからデータを読み込む方法について説明します。

注:

Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、 「Delta テーブル ストリーミングの読み取りと書き込み」を参照してください。

重要

Databricks SQL で作成されたストリーミング テーブルは、サーバレス Delta Live Tables パイプラインによってサポートされます。この機能を使用するには、ワークスペースがサーバレス パイプラインをサポートしている必要があります。

始める前に

開始する前に、次の要件を満たす必要があります。

ワークスペースの要件:

コンピュート要件:

次のいずれかを使用する必要があります。

  • Current チャンネルを使用するSQLウェアハウス。

  • Databricks Runtime 13.3 LTS以降の共有アクセス モードを使用したコンピュート。

  • Databricks Runtime 15.4 LTS 以上のシングル ユーザー アクセス モードを備えたコンピュート。

    Databricks Runtime 15.3 以前では、シングル ユーザー コンピュートを使用して、他のユーザーが所有するストリーミング テーブルをクエリすることはできません。シングルユーザーコンピュートを Databricks Runtime 15.3以下で使用できるのは、ストリーミングテーブルを所有している場合のみです。 テーブルの作成者が所有者です。

    Databricks Runtime 15.4 LTS 以降では、テーブルの所有権に関係なく、シングル ユーザー コンピュートで Delta Live Tables で生成されたテーブルに対するクエリがサポートされています。 15.4 以降で提供されているデータDatabricks RuntimeLTS フィルタリングを利用するには、 Live Tables で生成されたテーブルをサポートするデータ フィルタリング機能がサーバレス コンピュートで実行されるため、 Deltaワークスペースがサーバレス コンピュートに対して有効になっている ことを確認する必要があります。シングル ユーザー コンピュートを使用してデータ フィルタリング操作を実行すると、サーバレス コンピュート リソースに対して課金される場合があります。 「シングル ユーザー コンピュートに対するきめ細かなアクセス制御」を参照してください。

権限の要件:

その他の要件:

  • ソース データへのパス。

    ボリューム・パスの例: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    外部ロケーションパスの例: s3://myBucket/analysis

    注:

    この記事では、ロードするデータが、アクセスできるUnity Catalogボリュームまたは外部ロケーションに対応するクラウド ストレージの場所にあることを前提としています。

ソース データの検出とプレビュー

  1. ワークスペースのサイドバーで、 「クエリ」をクリックし、 「クエリの作成」をクリックします。

  2. クエリ エディターで、ドロップダウン リストから Current チャンネルを使用するSQLウェアハウスを選択します。

  3. 次の内容をエディターに貼り付け、ソース データを識別する情報の代わりに山括弧 ( <> ) 内の値を使用して、 [実行]をクリックします。

    注:

    関数のデフォルトがデータを解析できない場合、 read_filesテーブル値関数を実行するとスキーマ推論エラーが発生する可能性があります。 たとえば、複数行の CSV または JSON ファイルの場合は、複数行モードを構成する必要がある場合があります。 パーサー オプションの一覧については、「 テーブル値関数read_files」を参照してください。

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

ストリーミングテーブルへのデータのロード

クラウド オブジェクト ストレージのデータからストリーミング テーブルを作成するには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

ランタイム チャンネルを設定する

SQLウェアハウスを使用して作成されたストリーミング テーブルは、Delta Live Tables パイプラインを使用して自動的に更新されます。Delta Live Tables パイプラインは、current チャンネルの by デフォルト のランタイムを使用します。 リリースプロセスの詳細については 、Delta Live Tables リリースノートとリリースアップグレードプロセス を参照してください。

Databricks 本番運用ワークロードには、current チャンネルを使用することをお勧めします。 新機能は最初に preview チャンネルにリリースされます。 プレビュー Delta Live Tables チャンネルにパイプラインを設定して、preview をテーブル プロパティとして指定することで、新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。

次のコード例は、CREATE ステートメントでチャンネルをプレビューに設定する方法を示しています。

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

## <a id="refresh"></a> Refresh a <st> using a DLT pipeline

This section describes patterns for refreshing a <st> with the latest data available from the sources defined in the query.

When you `CREATE` or `REFRESH` a <st>, the update processes using a serverless <DLT> pipeline. Each <st> you define has an associated <DLT> pipeline.

After you run the `REFRESH` command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.

.. note:: Only the table owner can refresh a <st> to get the latest data. The user that creates the table is the owner, and the owner can't be changed. You might need to refresh your <st> before using [time travel](/delta/history.md#time-travel) queries.

See [_](/delta-live-tables/index.md).

### Ingest new data only

By default, the `read_files` function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.

To avoid ingesting data that already exists in the source directory at the time of table creation, set the `includeExistingFiles` option to `false`. This means that only data that arrives in the directory after table creation is processed. For example:

.. azure::

  ```sql
  CREATE OR REFRESH STREAMING TABLE my_bronze_table
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    includeExistingFiles => false)
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

ストリーミングテーブルを完全に更新する

完全更新では、ソースで使用可能なすべてのデータが最新の定義で再処理されます。 データの履歴全体を保持しない、または保持期間が短いソース ( Kafkaなど) で完全な更新を呼び出すことは、完全な更新によって既存のデータが切り捨てられるため、推奨されません。 ソースでデータが利用できなくなった場合、古いデータを回復できない可能性があります。

例:

REFRESH STREAMING TABLE my_bronze_table FULL

ストリーミング テーブルの自動更新をスケジュールする

定義されたスケジュールに基づいてストリーミング テーブルが自動的に更新されるように構成するには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

更新スケジュール クエリの例については、 「ALTER STREAMING TABLE」を参照してください。

更新の状態を追跡する

ストリーミング テーブルの更新のステータスを確認するには、 Delta Live Tables UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの DESCRIBE EXTENDED コマンドによって返される更新情報を表示します。

DESCRIBE EXTENDED <table-name>

Kafkaからのストリーミング インジェスト

Kafka からのストリーミング取り込みの例については、 read_kafkaを参照してください。

ストリーミングテーブルへのアクセス権をユーザーに付与する

ユーザーにストリーミング テーブルに対するSELECT権限を付与してクエリを実行できるようにするには、次の内容をクエリ エディターに貼り付けて、 [実行]をクリックします。

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Unity Catalogのセキュリティ保護可能なオブジェクトに対する権限の付与の詳細については、 Unity Catalog権限とセキュリティ保護可能なオブジェクト」を参照してください。

クエリ履歴を使用した実行の監視

クエリ履歴ページを使用して、クエリの詳細とクエリ プロファイルにアクセスできるため、ストリーミング テーブルの更新を実行するために使用される Delta Live Tables パイプラインでパフォーマンスの低いクエリやボトルネックを特定するのに役立ちます。 クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 クエリ履歴 」と 「クエリ プロファイル」を参照してください。

プレビュー

この機能はパブリックプレビュー段階です。ワークスペース管理者は、プレビューページからこの機能を有効にできます。「Databricksプレビューの管理」を参照してください。

ストリーミングテーブルに関連するすべてのステートメントがクエリ履歴に表示されます。 [ステートメント] ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを検査できます。すべての CREATE ステートメントの後には、Delta Live Tables パイプラインで非同期的に実行される REFRESH ステートメントが続きます。 通常、 REFRESH ステートメントには、パフォーマンスの最適化に関する知見を提供する詳細なクエリ プランが含まれています。

クエリ履歴 UI で REFRESH ステートメントにアクセスするには、次の手順を使用します。

  1. 左側のサイドバーで [ 履歴アイコン ] をクリックして、 クエリー履歴 UI を開きます。

  2. 「ステートメント」ドロップダウン・フィルターから「REFRESH」チェック・ボックスを選択します。

  3. クエリステートメントの名前をクリックすると、クエリの実行時間や集計されたメトリクスなどの概要の詳細が表示されます。

  4. [ クエリ プロファイルの表示 ] をクリックして、クエリ プロファイルを開きます。 クエリ プロファイルのナビゲーションの詳細については、 クエリ プロファイル を参照してください。

  5. 必要に応じて、[ クエリ ソース ] セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。

また、 SQL エディターのリンクを使用して、または SQLウェアハウスに添付されたノートブックからクエリの詳細にアクセスすることもできます。

注:

ストリーミングテーブルは、 preview チャンネルを使用して実行するように設定する必要があります。 「ランタイム チャンネルを設定する」を参照してください。