クエリー ストリーミングデータ

Databricks を使用して、構造化ストリーミングを使用してストリーミング データソースをクエリーできます。 Databricks は、Python と Scala でのストリーミング ワークロードを幅広くサポートし、SQL を使用したほとんどの構造化ストリーミング機能をサポートしています。

次の例は、ノートブックでの対話型開発中にストリーミング データを手動で検査するためにメモリ シンクを使用する方法を示しています。 ノートブック UI の行出力制限により、ストリーミングクエリーによって読み取られるすべてのデータが監視されない場合があります。 本番運用ワークロードでは、ストリーミングクエリーは、ターゲットテーブルまたは外部システムに書き込むことによってのみトリガーする必要があります。

手記

ストリーミングデータに対する対話型クエリーの SQL サポートは、汎用コンピュートで実行されているノートブックに限定されます。 また、Databricks SQL または Delta Live Tables でストリーミング テーブルを宣言するときにも SQL を使用できます。 「 Databricks SQL でストリーミング テーブルを使用してデータを読み込む 」および「 Delta Live Tables とは」を参照してください。

クエリー ストリーミング・システムからのデータ

Databricks には、次のストリーミング システム用のストリーミング データ リーダーが用意されています。

  • Kafka

  • Kinesis

  • PubSub (英語)

  • Pulsar

これらのシステムに対してクエリーを初期化するときには、構成済みの環境と読み取り元として選択したシステムによって異なる構成の詳細を指定する必要があります。 「ストリーミング データソースの構成」を参照してください。

ストリーミング システムに関連する一般的なワークロードには、レイクハウスへのデータ取り込みや、外部システムにデータをシンクするためのストリーム処理などがあります。 ストリーミング ワークロードの詳細については、「 Databricks でのストリーミング」を参照してください。

次の例は、Kafka からのインタラクティブなストリーミング読み取りを示しています。

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

ストリーミング読み込みとしてのテーブルへのクエリー

Databricks では、Delta Lake を使用してすべてのテーブルがデフォルトによって作成されます。 Delta テーブルに対してストリーミング クエリを実行すると、テーブルのバージョンがコミットされたときに、クエリによって新しいレコードが自動的に取得されます。 既定では、ストリーミング クエリは、ソース テーブルに追加されたレコードのみが含まれることを想定しています。 更新と削除を含むストリーミング データを操作する必要がある場合、Databricks では Delta Live Tables と APPLY CHANGES INTOを使用することをお勧めします。 「Delta Live Tables の APPLY CHANGES API を使用した簡略化されたチェンジデータキャプチャ」を参照してください。

次の例は、テーブルからの対話型ストリーミング読み取りの実行を示しています。

display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name

Auto Loaderによるクラウドオブジェクトのデータへのクエリー

Auto Loader(Databricks クラウド データ コネクタ) を使用して、クラウド オブジェクト ストレージからデータをストリーミングできます。コネクタは、Unity Catalog ボリュームまたはその他のクラウド オブジェクト ストレージの場所に格納されているファイルで使用できます。 Databricks では、ボリュームを使用してクラウド オブジェクト ストレージ内のデータへのアクセスを管理することをお勧めします。 「データソースへの接続」を参照してください。

Databricks は、一般的な構造化形式、半構造化形式、非構造化形式で格納されているクラウド オブジェクト ストレージ内のデータのストリーミング インジェスト用にこのコネクタを最適化します。 Databricks では、スループットを最大化し、レコードの破損やスキーマの変更による潜在的なデータ損失を最小限に抑えるために、取り込まれたデータをほぼ生の形式で保存することをお勧めします。

クラウド オブジェクト ストレージからのデータの取り込みに関するその他の推奨事項については、 「Databricks レイクハウスへのデータの取り込み」を参照してください。

次の例は、ボリューム内の JSON ファイルのディレクトリから読み取られる対話型ストリーミングを示しています。

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')