DatabricksでJOINを行う
Databricks は ANSI 標準のJOIN構文をサポートしています。 この記事では、バッチ処理とストリーム処理によるJOINの違いについて説明し、JOINのパフォーマンスを最適化するための推奨事項をいくつか示します。
注:
Databricks は、セット演算子UNION
、 INTERSECT
、およびEXCEPT
の標準構文もサポートしています。 「セット演算子」を参照してください。
ストリーミングJOINとバッチJOINの違い
Databricks でのJOINは、ステートフルまたはステートレスのいずれかです。
すべてのバッチJOINはステートレスJOINです。 結果はすぐに処理され、クエリの実行時のデータが反映されます。 クエリが実行されるたびに、指定されたソース データに基づいて新しい結果が計算されます。 バッチJOINを参照してください。
2 つのストリーミングデータソース間のJOINはステートフルです。 ステートフルJOINでは、 Databricksデータソースと結果に関する情報を追跡し、結果を繰り返し更新します。 ステートフルJOINはオンライン データ処理に強力なソリューションを提供できますが、効果的に実装するのは難しい場合があります。 これらは、出力モード、トリガー間隔、およびウォーターマークに応じて複雑な操作セマンティクスを持ちます。 ストリーム-ストリームJOINを参照してください。
ストリーム静的JOINはステートレスですが、増分データソース (ファクト テーブルなど) と静的データソース (ゆっくり変化するディメンション テーブルなど) をJOINするための適切なオプションを提供します。 クエリが実行されるたびに両側のすべてのレコードをJOINするのではなく、ストリーミング ソースから新しく受信したレコードのみが静的テーブルの現在のバージョンとJOINされます。 ストリーム静的JOINを参照してください。
バッチJOIN
Databricks は、内部JOIN、外部JOIN、セミJOIN、アンチJOIN、クロスJOINなどの標準 SQL JOIN構文をサポートしています。 「JOIN」を参照してください。
注:
Databricks では、内部JOINの結果の増分計算を最適化するために、マテリアライズド ビューを使用することを推奨しています。 「Databricks SQL でマテリアライズド ビューを使用する」を参照してください。
ストリーム-ストリームJOIN
2 つのストリーミングデータソースをJOINすると、状態情報の管理と結果の計算および出力の推論において大きな課題が生じる可能性があります。 ストリーム-ストリームJOINを実装する前に、ウォーターマークが状態管理に与える影響など、ステートフル ストリーミングの運用セマンティクスを十分に理解しておくことをDatabricksでは推奨します。 次の記事を参照してください。
Databricks では、すべてのストリームとストリームのJOINの両側にウォーターマークを指定することを推奨しています。 次のJOINタイプがサポートされています。
Inner join
Left outer join
Right outer join
Full outer join
Left semi join
Stream-steam joins について は、 構造化ストリーミングのドキュメントを参照してください。Apache Spark
stream-static JOIN
注:
説明したストリーム静的JOINの動作では、静的データが Delta Lake を使用して保存されていることを前提としています。
ストリームデータと静的データのJOINでは、ステートレス結合を使用して、Delta テーブルの最新の有効なバージョン (静的データ) をデータストリームにJOINします。
Databricks がストリームデータと静的データのJOINでデータのマイクロバッチを処理する際に、静的な Delta テーブルのデータの最新の有効なバージョンが現在のマイクロバッチに存在するレコードとJOINします。JOINはステートレスであるため、ウォーターマークを構成する必要がなく、結果を低レイテンシで処理できます。JOINで使用される静的な Delta テーブル内のデータは、ゆっくりと変化する必要があります。
次の例は、このパターンを示しています。
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")
query = (streamingDF
.join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
.writeStream
.option("checkpointLocation", checkpoint_path)
.table("orders_with_customer_info")
)
JOINパフォーマンスの最適化
Photon有効になっているコンピュートでは、常に最適なJOINタイプが選択されます。 Photon とは何か? を参照してください。
Photon が有効になっている最新の Databricks Runtime バージョンを使用すると、通常は良好なJOINパフォーマンスが得られますが、次の推奨事項も考慮する必要があります。
クロスJOINは非常にコストがかかります。 低レイテンシーまたは頻繁な再計算を必要とするワークロードとクエリからクロスJOINを削除します。
JOINの順序は重要です。 複数のJOINを実行する場合は、必ず最初に最小のテーブルをJOINし、次に結果を大きなテーブルとJOINします。
オプティマイザーは、多くのJOINと集計を含むクエリで苦労する可能性があります。 中間結果を保存すると、クエリの計画と結果の計算を高速化できます。
パフォーマンスを向上させるために、最新の統計を保持します。
ANALYZE
(パブリック プレビュー) を使用した予測的最適化では、統計を自動的に更新および管理できます。また、クエリ プランナーでクエリ プランナーの統計を更新するために、クエリANALYZE TABLE table_name COMPUTE STATISTICS
を実行することもできます。
プレビュー
ANALYZE
による予測的最適化はパブリック プレビュー段階です。これには、書き込み中のインテリジェントな統計収集が含まれます。 このフォームを使用して、パブリック プレビューにサインアップします。
注:
Databricks Runtime 14.3 LTS 以降では、Delta Lake がデータ スキップの統計情報を収集する列を変更し、Delta ログ内の既存の統計を再計算できます。 Delta統計列の指定」を参照してください。
Databricks でのJOINのヒント
Apache Spark は、RANGE JOINとSKEW JOINのJOINヒントの指定をサポートしています。 Databricks はこれらのJOINを自動的に最適化するため、SKEW JOINのヒントは必要ありません。 「ヒント」を参照してください。
RANGE JOINのヒントは、JOINのパフォーマンスが悪く、不公平なJOINを実行している場合に役立ちます。 例としては、タイムスタンプの範囲やクラスタリング ID の範囲でのJOINが挙げられます。 「RANGE JOINの最適化」を参照してください。