ストリーミングテーブル
ストリーミングテーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Delta テーブルです。 ストリーミングテーブルは、ETL パイプライン内の 1 つ以上のフローでターゲットにすることができます。
ストリーミングテーブルは、次の理由からデータ取り込みに適しています。
- 各入力行は 1 回だけ処理され、インジェストワークロードの大部分をモデル化します (つまり、テーブルに行を追加または更新/挿入します)。
- 大量の追加専用データを処理できます。
ストリーミングテーブルは、次の理由から、低レイテンシーのストリーミング変換にも適しています。
- 時間の行とウィンドウに対する推論
- 大量のデータを処理する
- 低レイテンシ
次の図は、ストリーミングテーブルの仕組みを示しています。
更新のたびに、ストリーミングテーブルに関連付けられたフローは、ストリーミングソース内の変更された情報を読み取り、そのテーブルに新しい情報を追加します。
ストリーミングテーブルは、1 つの DLT パイプラインによって定義および更新されます。 DLT パイプラインを作成するときは、パイプラインのソース コードでストリーミングテーブルを明示的に定義します。パイプラインによって定義されたテーブルは、他のパイプラインで変更または更新することはできません。複数のフローを定義して、1 つのストリーミングテーブルに追加できます。
Databricks SQLでパイプラインの外部にストリーミングテーブルを作成すると、 Databricks は、このテーブルの更新に使用される非表示のDLTパイプラインを作成します。
フローの詳細については、「DLT フローを使用してデータを段階的に読み込み、処理する」を参照してください。
ストリーミングテーブル for ingestion
ストリーミングテーブルは、追加専用のデータソース用に設計されており、入力を一度だけ処理します。
次の例は、ストリーミングテーブルを使用してクラウドストレージから新しいファイルを取り込む方法を示しています。
- Python
- SQL
import dlt
# create a streaming table
@dlt.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
データセット定義で spark.readStream
関数を使用すると、DLT はデータセットをストリームとして扱い、作成されるテーブルはストリーミングテーブルになります。
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
ストリーミングテーブルへのデータの読み込みの詳細については、「DLTによるデータの読み込み」を参照してください。
次の図は、追加専用ストリーミングテーブルの仕組みを示しています。
ストリーミングテーブルにすでに追加されている行は、後でパイプラインを更新するときに再クエリされません。 クエリを変更した場合 (たとえば、 SELECT LOWER (name)
から SELECT UPPER (name)
)、既存のローは大文字に更新されませんが、新しいローは大文字になります。完全更新をトリガーして、ソース テーブルから以前のすべてのデータを再クエリし、ストリーミングテーブルのすべての行を更新できます。
ストリーミングテーブルと低遅延ストリーミング
ストリーミングテーブルは、境界付き状態での低遅延ストリーミング用に設計されています。ストリーミングテーブルはチェックポイント管理を使用するため、低レイテンシーのストリーミングに適しています。ただし、自然にバインドされたストリームまたはウォーターマークで囲まれたストリームが想定されます。
自然な境界のあるストリームは、開始と終了が明確に定義されたストリーミング データソースによって生成されます。 自然な境界のあるストリームの例としては、ファイルの最初のバッチが配置された後に新しいファイルが追加されないファイルのディレクトリからデータを読み取る場合があります。 ファイルの数が有限であるため、ストリームには境界があると見なされ、すべてのファイルが処理された後にストリームが終了します。
ウォーターマークを使用してストリームをバインドすることもできます。Spark構造化ストリーミングのウォーターマークは、システムが遅延イベントを待機してから時間枠が完了したと見なす時間を指定することで、遅延データの処理を支援するメカニズムです。ウォーターマークのない無制限のストリームは、メモリ不足が原因で DLT パイプラインが失敗する原因となる可能性があります。
ステートフル ストリーム処理の詳細については、「 ウォーターマークを使用して DLT でステートフル処理を最適化する」を参照してください。
ストリーム・スナップショット・ジョイン
ストリーム-スナップショット結合は、ストリームと、ストリームの開始時にスナップショットが作成されるディメンションとの間の結合です。 これらの結合は、ストリームの開始後にディメンションが変更された場合、ディメンション テーブルは時間的にスナップショットとして扱われ、ストリームの開始後にディメンション テーブルへの変更はディメンション テーブルを再読み込みまたは更新しない限り反映されないため、再計算されません。これは、結合の小さな不一致を受け入れることができる場合、妥当な動作です。たとえば、トランザクションの数が顧客の数よりも何桁も大きい場合、近似結合は許容されます。
次のコード例では、ディメンション テーブル Customer を 2 つの行で結合し、増え続けるデータセット transactions を結合します。 これら 2 つのデータセット間の結合を sales_report
というテーブルに具体化します。外部プロセスが新しい行 (customer_id=3, name=Zoya
) を追加して顧客テーブルを更新する場合、ストリームの開始時に静的ディメンション テーブルがスナップショットされたため、この新しい行は結合に存在しないことに注意してください。
import dlt
@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
return spark.read.table("customers")
@dlt.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return (
facts.join(dims, on="customer_id", how="inner"
)
ストリーミングテーブルの制限事項
ストリーミングテーブルには、次の制限があります。
- 限定的な進化: データセット全体を再計算せずにクエリを変更できます。ストリーミングテーブルは行を一度しか見ないため、異なる行に対して異なるクエリを操作できます。つまり、データセットで実行されているクエリの以前のバージョンをすべて把握しておく必要があります。すでに処理されたストリーミングテーブルデータを更新するには、完全な更新が必要です。
- 国営: ストリーミングテーブルは低レイテンシーであるため、操作するストリームが自然にバインドされているか、ウォーターマークで囲まれていることを確認する必要があります。詳細については、「 ウォーターマークを使用して DLT でのステートフル処理を最適化する」を参照してください。
- 結合は再計算されません。 ストリーミングテーブル内の結合は、ディメンションが変更されても再計算されません。 この特性は、「速いが間違っている」シナリオに適しています。ビューを常に正しく表示する場合は、マテリアライズドビューを使用できます。マテリアライズドビューは、ディメンションが変更されたときに結合を自動的に再計算するため、常に正しいビューです。詳細については、 マテリアライズドビューを参照してください。