ストリーミングデータのクエリ
Databricks を使用して、構造化ストリーミングを使用してストリーミングデータソースをクエリーできます。 Databricks は、Python と Scala でのストリーミング ワークロードを幅広くサポートし、SQL を使用したほとんどの構造化ストリーミング機能をサポートしています。
次の例は、ノートブックでの対話型開発中にストリーミング データを手動で検査するためにメモリ シンクを使用する方法を示しています。 ノートブック UI の行出力制限により、ストリーミング クエリによって読み取られたすべてのデータが観察されない場合があります。 本番運用ワークロードでは、ストリーミングクエリをトリガーするには、ターゲットテーブルまたは外部システムに書き込む必要があります。
SQL 、ストリーミング データに対する対話型クエリのサポートは、汎用コンピュートにアタッチされたノートブックの実行に限定されます。 DLT でストリーミングテーブルを宣言するときにも SQL を使用できます。DLTとはを参照してください。
ストリーミングシステムからのデータのクエリ
Databricks には、次のストリーミング システム用のストリーミング データ リーダーが用意されています。
- Kafka
- Kinesis
- パブサブ
- Pulsar
これらのシステムに対してクエリーを初期化するときには、構成済みの環境と読み取り元として選択したシステムによって異なる構成の詳細を指定する必要があります。 Databricks レイクハウスへのデータの取り込みを参照してください。
ストリーミング システムに関連する一般的なワークロードには、レイクハウスへのデータ取り込みと、データを外部システムにシンクするストリーム処理が含まれます。 ストリーミングワークロードの詳細については、「 構造化ストリーミングの概念」を参照してください。
次の例は、Kafka からの対話型ストリーミング読み取りを示しています。
- Python
- SQL
display(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'latest'
);
ストリーミング読み取りとしてテーブルをクエリする
Databricks は、 Delta Lake by デフォルトを使用してすべてのテーブルを作成します。 Delta テーブルに対してストリーミング クエリを実行すると、テーブルのバージョンがコミットされると、クエリによって新しいレコードが自動的に取得されます。デフォルトでは、ストリーミングクエリは、ソーステーブルに追加されたレコードのみが含まれていることを想定しています。更新と削除を含むストリーミング データを操作する必要がある場合、Databricks では DLT と APPLY CHANGES INTO
を使用することをお勧めします。「 APPLY CHANGES APIs: DLTによるチェンジデータキャプチャの簡素化」を参照してください。
次の例は、テーブルからの対話型ストリーミング読み取りの実行を示しています。
- Python
- SQL
display(spark.readStream.table("table_name"))
SELECT * FROM STREAM table_name
Auto Loader を使用したクラウド オブジェクト ストレージのデータのクエリ
クラウド データAuto Loader Databricksコネクタである を使用して、クラウド オブジェクト ストレージからデータをストリームできます。コネクタは、Unity Catalog ボリュームまたはその他のクラウド オブジェクト ストレージの場所に格納されているファイルで使用できます。 Databricks では、ボリュームを使用してクラウド オブジェクト ストレージ内のデータへのアクセスを管理することをお勧めします。 「データソースへの接続」を参照してください。
Databricks は、一般的な構造化形式、半構造化形式、非構造化形式で格納されているクラウドオブジェクトストレージ内のデータのストリーミング インジェスト用にこのコネクタを最適化します。 Databricks では、スループットを最大化し、レコードの破損やスキーマの変更による潜在的なデータ損失を最小限に抑えるために、取り込まれたデータをほぼ生の形式で保存することをお勧めします。
クラウド オブジェクト ストレージからのデータの取り込みに関するその他の推奨事項については、「 Databricks レイクハウスにデータを取り込む」を参照してください。
次の例は、ボリューム内の JSON ファイルのディレクトリから読み取られる対話型ストリーミングを示しています。
- Python
- SQL
display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))
SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')