Databricks で結合を操作する

Databricks は ANSI 標準の結合構文をサポートしています。 この記事では、バッチ処理とストリーム処理による結合の違いについて説明し、結合パフォーマンスを最適化するための推奨事項をいくつか示します。

注:

Databricks は、セット演算子UNIONINTERSECT 、およびEXCEPTの標準構文もサポートしています。 「セット演算子」を参照してください。

ストリーミング結合とバッチ結合の違い

Databricks での結合は、ステートフルまたはステートレスのいずれかです。

すべてのバッチ結合はステートレス結合です。 結果はすぐに処理され、クエリの実行時のデータが反映されます。 クエリが実行されるたびに、指定されたソース データに基づいて新しい結果が計算されます。 バッチ結合を参照してください。

2 つのストリーミングデータソース間の結合はステートフルです。 ステートフル結合では、 Databricksデータソースと結果に関する情報を追跡し、結果を繰り返し更新します。 ステートフル結合はオンライン データ処理に強力なソリューションを提供できますが、効果的に実装するのは難しい場合があります。 これらは、出力モード、トリガー間隔、およびウォーターマークに応じて複雑な操作セマンティクスを持ちます。 ストリーム-ストリーム結合を参照してください。

ストリーム静的結合はステートレスですが、増分データソース (ファクト テーブルなど) と静的データソース (ゆっくり変化するディメンション テーブルなど) を結合するための適切なオプションを提供します。 クエリが実行されるたびに両側のすべてのレコードを結合するのではなく、ストリーミング ソースから新しく受信したレコードのみが静的テーブルの現在のバージョンと結合されます。 ストリーム静的結合を参照してください。

バッチ結合

Databricks は、内部結合、外部結合、セミ結合、アンチ結合、クロス結合などの標準 SQL 結合構文をサポートしています。 「JOIN」を参照してください。

注:

Databricks では、内部結合の結果の増分計算を最適化するために、マテリアライズド ビューを使用することを推奨しています。 「Databricks SQL でマテリアライズド ビューを使用する」を参照してください。

ストリーム-ストリームが参加

2 つのストリーミングデータソースを結合すると、状態情報の管理と結果の計算および出力の推論において大きな課題が生じる可能性があります。 ストリーム-ストリーム結合を実装する前に、ウォーターマークが状態管理に与える影響など、ステートフル ストリーミングの運用セマンティクスを十分に理解しておくことをDatabricks推奨します。 次の記事を参照してください。

Databricks では、すべてのストリームとストリームの結合の両側にウォーターマークを指定することを推奨しています。 次の結合タイプがサポートされています。

  • 内部結合

  • 左外部結合

  • 右外部結合

  • 完全外部結合

  • 左半結合

Stream-steam joins について は、 構造化ストリーミングのドキュメントを参照してください。Apache Spark

stream-static 結合

注:

説明したストリーム静的結合の動作では、静的データが Delta Lake を使用して保存されていることを前提としています。

ストリームデータと静的データの結合では、ステートレス結合を使用して、Delta テーブルの最新の有効なバージョン (静的データ) をデータストリームに結合します。

Databricks がストリームデータと静的データの結合でデータのマイクロバッチを処理する際に、静的な Delta テーブルのデータの最新の有効なバージョンが現在のマイクロバッチに存在するレコードと結合します。結合はステートレスであるため、電子透かしを構成する必要がなく、結果を低レイテンシで処理できます。結合で使用される静的な Delta テーブル内のデータは、ゆっくりと変化する必要があります。

次の例は、このパターンを示しています。

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

結合パフォーマンスの最適化

Photon有効になっているコンピュートでは、常に最適な結合タイプが選択されます。 Photon とは何か? を参照してください。

Photon が有効になっている最新の Databricks Runtime バージョンを使用すると、通常は良好な結合パフォーマンスが得られますが、次の推奨事項も考慮する必要があります。

  • クロス結合は非常にコストがかかります。 低レイテンシーまたは頻繁な再計算を必要とするワークロードとクエリからクロス結合を削除します。

  • 参加順序は重要です。 複数の結合を実行する場合は、必ず最初に最小のテーブルを結合し、次に結果を大きなテーブルと結合します。

  • オプティマイザーは、多くの結合と集計を含むクエリで苦労する可能性があります。 中間結果を保存すると、クエリの計画と結果の計算を高速化できます。

  • パフォーマンスを向上させるために、最新の統計を保持します。 クエリANALYZE TABLE table_name COMPUTE STATISTICSを実行して、クエリ プランナーの統計を更新します。

注:

Databricks Runtime 14.3 LTS 以降では、Delta Lake がデータ スキップの統計情報を収集する列を変更し、Delta ログ内の既存の統計を再計算できます。 Delta統計列の指定」を参照してください。

Databricks でのヒントの結合

Apache Spark は、範囲結合と傾斜結合の結合ヒントの指定をサポートしています。 Databricks はこれらの結合を自動的に最適化するため、傾斜結合のヒントは必要ありません。 「ヒント」を参照してください。

範囲結合のヒントは、結合のパフォーマンスが悪く、不等式結合を実行している場合に役立ちます。 例としては、タイムスタンプの範囲やクラスタリング ID の範囲での結合が挙げられます。 「範囲結合の最適化」を参照してください。