Databricks での結合の操作
Databricks は ANSI 標準のJOIN構文をサポートしています。 この記事では、バッチ処理とストリーム処理によるJOINの違いについて説明し、JOINのパフォーマンスを最適化するための推奨事項をいくつか示します。
Databricks は、集合演算子 UNION
、 INTERSECT
、および EXCEPT
の標準構文もサポートしています。 「Set 演算子」を参照してください。
ストリーミング結合とバッチ結合の違い
Databricks でのJOINは、ステートフルまたはステートレスのいずれかです。
すべてのバッチ結合はステートレス結合です。 結果はすぐに処理され、クエリの実行時にデータが反映されます。 クエリが実行されるたびに、指定したソースデータに基づいて新しい結果が計算されます。 バッチ結合を参照してください。
2 つのストリーミング データソース間の結合はステートフルです。 ステートフル結合では、 Databricks はデータソースと結果に関する情報を追跡し、結果を繰り返し更新します。 ステートフル結合は、オンラインデータ処理のための強力なソリューションを提供できますが、効果的に実装するのが難しい場合があります。 これらは、出力モード、トリガー間隔、およびウォーターマークに応じて複雑な操作セマンティクスを持っています。 ストリーム-ストリーム結合を参照してください。
ストリーム静的結合はステートレスですが、増分データソース (ファクトテーブルなど) を静的データソース (緩やかに変化するディメンションテーブルなど) を結合するための適切なオプションを提供します。 クエリが実行されるたびに両側のすべてのレコードを結合するのではなく、ストリーミング ソースから新しく受信したレコードのみが現在のバージョンの静的テーブルと結合されます。 ストリーム静的結合を参照してください。
バッチ結合
Databricks では、内部結合、外部結合、半結合、アンチ結合、クロス結合などの標準 SQL 結合構文がサポートされています。 JOIN を参照してください。
Databricks では、マテリアライズドビューを使用して、内部結合の結果の増分計算を最適化することをお勧めします。 Databricks SQL でマテリアライズドビューを使用するを参照してください。
ストリーム-ストリーム join
2 つのストリーミングデータソースをJOINすると、状態情報の管理と結果の計算および出力の推論において大きな課題が生じる可能性があります。 ストリーム-ストリームJOINを実装する前に、ウォーターマークが状態管理に与える影響など、ステートフル ストリーミングの運用セマンティクスを十分に理解しておくことをDatabricksでは推奨します。 次の記事を参照してください。
Databricks では、すべてのストリームとストリームのJOINの両側にウォーターマークを指定することを推奨しています。 次のJOINタイプがサポートされています。
- Inner join
- Left outer join
- Right outer join
- Full outer join
- Left semi join
Apache Spark構造化ストリーミングのドキュメントのストリーム-ストリームJOINを参照してください。
ストリーム静的結合
説明したストリーム静的JOINの動作では、静的データが Delta Lake を使用して保存されていることを前提としています。
ストリームデータと静的データのJOINでは、ステートレス結合を使用して、Delta テーブルの最新の有効なバージョン (静的データ) をデータストリームにJOINします。
Databricks がストリームデータと静的データの結合でデータのマイクロバッチを処理する際に、静的な Delta テーブルのデータの最新の有効なバージョンが現在のマイクロバッチに存在するレコードと結合します。結合はステートレスであるため、電子透かしを構成する必要がなく、結果を低レイテンシで処理できます。結合で使用される静的な Delta テーブル内のデータは、ゆっくりと変化する必要があります。
次の例は、このパターンを示しています。
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")
query = (streamingDF
.join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
.writeStream
.option("checkpointLocation", checkpoint_path)
.table("orders_with_customer_info")
)
ジョインのパフォーマンスを最適化する
Photonを有効にしたコンピュートでは、常に最適な参加タイプが選択されます。 Photonとはを参照してください。
Photon が有効になっている最新の Databricks Runtime バージョンを使用すると、通常は良好なJOINパフォーマンスが得られますが、次の推奨事項も考慮する必要があります。
-
クロスJOINは非常にコストがかかります。 低レイテンシーまたは頻繁な再計算を必要とするワークロードとクエリからクロスJOINを削除します。
-
JOINの順序は重要です。 複数のJOINを実行する場合は、必ず最初に最小のテーブルをJOINし、次に結果を大きなテーブルとJOINします。
-
オプティマイザーは、多くのJOINと集計を含むクエリで苦労する可能性があります。 中間結果を保存すると、クエリの計画と結果の計算を高速化できます。
-
パフォーマンスを向上させるために、最新の統計を保持します。 予測的最適化 統計を自動的に更新および維持します。 Unity Catalog マネージドテーブルの予測的最適化を参照してください。
また、クエリ プランナーでクエリ プランナーの統計を更新するために、クエリ
ANALYZE TABLE table_name COMPUTE STATISTICS
を実行することもできます。
Databricks Runtime 14.3 LTS 以降では、Delta Lake がデータをスキップするために統計を収集する列を変更し、Delta ログ内の既存の統計を再計算できます。 Delta 統計カラムの指定を参照してください。
Databricks のヒントを結合する
Apache Spark では、範囲結合とスキュー結合の結合ヒントの指定がサポートされています。 Databricks によって自動的に最適化されるため、スキュー結合のヒントは必要ありません。 ヒントをご覧ください。
範囲結合のヒントは、結合のパフォーマンスが悪く、等価結合を実行している場合に役立ちます。 例としては、タイムスタンプ範囲やクラスタリング ID の範囲での結合などがあります。 範囲結合の最適化を参照してください。