メインコンテンツまでスキップ

Databricks での結合の操作

Databricks では、ANSI 標準の結合構文がサポートされています。この記事では、バッチ処理とストリーム処理の結合の違いについて説明します。

注記

Databricks は、集合演算子 UNIONINTERSECT、および EXCEPTの標準構文もサポートしています。 「Set 演算子」を参照してください。

ストリーミング結合とバッチ結合の違い

Databricks でのJOINは、ステートフルまたはステートレスのいずれかです。

すべてのバッチ結合はステートレス結合です。 結果はすぐに処理され、クエリの実行時にデータが反映されます。 クエリが実行されるたびに、指定したソースデータに基づいて新しい結果が計算されます。 バッチ結合を参照してください。

2 つのストリーミング データソース間の結合はステートフルです。 ステートフル結合では、 Databricks はデータソースと結果に関する情報を追跡し、結果を繰り返し更新します。 ステートフル結合は、オンラインデータ処理のための強力なソリューションを提供できますが、効果的に実装するのが難しい場合があります。 これらは、出力モード、トリガー間隔、およびウォーターマークに応じて複雑な操作セマンティクスを持っています。 ストリーム-ストリーム結合を参照してください。

ストリーム静的結合はステートレスですが、増分データソース (ファクトテーブルなど) を静的データソース (緩やかに変化するディメンションテーブルなど) を結合するための適切なオプションを提供します。 クエリが実行されるたびに両側のすべてのレコードを結合するのではなく、ストリーミング ソースから新しく受信したレコードのみが現在のバージョンの静的テーブルと結合されます。 ストリーム静的結合を参照してください。

バッチ結合

Databricks では、内部結合、外部結合、半結合、アンチ結合、クロス結合などの標準 SQL 結合構文がサポートされています。 JOIN を参照してください。

注記

Databricks では、マテリアライズドビューを使用して、内部結合の結果のインクリメンタル計算を最適化することをお勧めします。 マテリアライズドビューを参照してください。

ストリーム-ストリーム join

2 つのストリーミングデータソースをJOINすると、状態情報の管理と結果の計算および出力の推論において大きな課題が生じる可能性があります。 ストリーム-ストリームJOINを実装する前に、ウォーターマークが状態管理に与える影響など、ステートフル ストリーミングの運用セマンティクスを十分に理解しておくことをDatabricksでは推奨します。 次の記事を参照してください。

Databricks では、すべてのストリームとストリームのJOINの両側にウォーターマークを指定することを推奨しています。 次のJOINタイプがサポートされています。

  • Inner join
  • Left outer join
  • Right outer join
  • Full outer join
  • Left semi join

Apache Spark構造化ストリーミングのドキュメントのストリーム-ストリームJOINを参照してください。

ストリーム静的結合

注記

説明したストリーム静的JOINの動作では、静的データが Delta Lake を使用して保存されていることを前提としています。

ストリームデータと静的データのJOINでは、ステートレス結合を使用して、Delta テーブルの最新の有効なバージョン (静的データ) をデータストリームにJOINします。

Databricks がストリームデータと静的データの結合でデータのマイクロバッチを処理する際に、静的な Delta テーブルのデータの最新の有効なバージョンが現在のマイクロバッチに存在するレコードと結合します。結合はステートレスであるため、電子透かしを構成する必要がなく、結果を低レイテンシで処理できます。結合で使用される静的な Delta テーブル内のデータは、ゆっくりと変化する必要があります。

次の例は、このパターンを示しています。

Python
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
.join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
.writeStream
.option("checkpointLocation", checkpoint_path)
.table("orders_with_customer_info")
)

Databricks のヒントを結合する

Apache Spark では、範囲結合とスキュー結合の結合ヒントの指定がサポートされています。 Databricks によって自動的に最適化されるため、スキュー結合のヒントは必要ありません。 ヒントをご覧ください。

範囲結合のヒントは、結合のパフォーマンスが悪く、等価結合を実行している場合に役立ちます。例としては、タイムスタンプ範囲やクラスタリング ID の範囲での結合などがあります。「範囲結合の最適化」と「Databricks での結合パフォーマンスの最適化」を参照してください。