同じクラスターで複数の構造化ストリーミングクエリを実行します
多くの顧客が同じDatabricksクラスターで複数の構造化ストリーミングクエリを実行します。このパターンはサポートされていますが、Databricks はスケーリングの問題やパフォーマンスのボトルネックを回避するために、クラスターあたりのクエリ数を制限することを推奨しています。サーバレスコンピュートでは、Databricks がスケーリングを自動的に管理するため、これらの考慮事項は対応されます。ドライバーとエグゼキューターのサイジングを制御するクラシックコンピュートを使用している場合、このページでは、念頭に置くべき主要なボトルネックと、それらに対処する方法について説明します。
Databricks は、インフラストラクチャの複雑さを自動的に管理する新しいストリーミングワークロード向けに、Lakeflow Spark 宣言型パイプラインの使用を推奨しています。Lakeflow Spark宣言型パイプラインを参照してください。
同じクラスターで複数のクエリを使用する場合
同じクラスターで複数のストリーミングクエリを実行すると、インフラコストを削減できます。特に、それぞれが専用のコンピュートを必要としない多数の小規模ストリームがある場合に有効です。主なトレードオフは、障害が共有されることです。クラスターが失敗すると、その上のすべてのストリームも失敗します。ミッションクリティカルなパイプラインの場合、その共通の障害モードは多くの場合、容認できません。
クリティカルなストリームと非クリティカルなストリームが混在するワークロードの場合、Databricks は以下を推奨します:
- 各ストリームにビジネスインパクトに基づいて優先度を割り当ててください。
- コストが高くなっても、ミッションクリティカルなストリームは専用クラスターに配置してください。
- 優先度の低いストリームを共存させることで、コンピュートを共有し、コストを削減できます。
ドライバーのサイズ
ドライバーは共有リソースです。複数のクエリは、同じ CPU、メモリ、DAG スケジューラ、タスク スケジューラ、およびドライバー側の UDF 実行を共有しています(例: foreachBatch)。多数の並列ストリームを実行する際は、標準的な CPU およびメモリのプロビジョニングを超えて、これらの特定のボトルネックに注意してください。
- Auto Loader のオーバーヘッド :Auto Loader ストリームを使用している場合、ファイルの検出とディレクトリ一覧によりドライバの負荷が増大します。
- OSレベルのリソース制限(オープンファイル) :単一のドライバーで、大量のファイルベースのストリーム(
FileStreamSourceや Auto Loader など)を同時に実行すると、ユーザーレベルのオープンファイル記述子の制限を使い果たし、ランダムなストリームの失敗を引き起こす可能性があります。 - リスナーバスのバックプレッシャー :多数の並列ストリーミングクエリが、単一のSparkセッションの
StreamingQueryListenerバスにバックプレッシャーを引き起こす可能性があります。すべてのイベント(onQueryIdleを含む)がこの単一のバスに送信され、大規模なイベントバックログは非同期のonQueryProgressハンドラーを著しく遅延させ、クラスターの安定性に影響を与える可能性があります。 - コストのかかるドライバー操作 :メモリ不足 (OOM) エラーの原因となる大きな結果セットの実体化を避けるため、絶対に必要でない限り、ドライバー上で
collect()またはその他のコストのかかるDataFrame操作を呼び出さないでください。
ドライバー競合のトラブルシューティング
OOMまたは競合の問題が原因でドライバのクラッシュが発生している場合は:
- Spark UIでドライバメトリクスを監視できます。CPU、メモリ、ディスク使用率が高い場合は、クラスターコンピュート設定でドライバーのサイズを調整してください。
- 問題が解決しない場合は、コードがドライバー上でメモリ集約型操作やUDFを実行していないことを確認してください。
- ドライバーをこれ以上垂直にスケールアップできない場合は、Databricksでは、これらの共有ノードのスケーリングのボトルネックを回避するため、ジョブを複数のクラスターに分割することを強くお勧めします。
エグゼキューターのサイズ設定
複数のクエリーが同じクラスターで実行されている場合、すべてのクエリーはエグゼキューター上のタスクスロットを共有します。あるクエリのステージが利用可能なスロットを占有する可能性があり、その結果、他のクエリの遅延やスターベーションにつながる可能性があります。Spark は、タスクスロットと利用可能なコアを1対1で対応付けています。クエリを同時に実行する必要がある場合は、十分なコアが利用可能であることを確認してください。
一般的に、エグゼキューターはドライバーノードよりもメモリを多く使用する操作を実行する可能性があります。アプリケーションの負荷を処理するために、必要に応じて、エグゼキューターのJVMおよびオフヒープメモリ割り当てパラメーターを調整します。エグゼキューターノードが、CPU、メモリ、ディスク領域の観点から適切にサイズ設定されていることを確認し、必要に応じて垂直にスケールしてください。垂直スケーリングが不可能な場合は、クラスターにワーカーノードを追加することを検討してください。
これらの変更の一部を有効にするには、クラスターを再起動する必要がある場合があります。
スケジューラ プールを使用する
同じソースコードから複数のストリーミング クエリを実行する場合、クエリにコンピュート容量を割り当てるようにスケジューラ プールを構成できます。
デフォルトでは、ノートブックで開始されたすべてのクエリは、同じ公正なスケジューリングプールで実行されます。ノートブック内のすべてのストリーミング クエリのトリガーによって生成された Apache Spark ジョブは、"先入れ先出し" (FIFO) の順序で 1 つずつ実行されます。これにより、クエリがクラスターリソースを効率的に共有していないため、クエリに不要な遅延が発生する可能性があります。
スケジューラプールを使用すると、どの構造化ストリーミングクエリがコンピュートリソースを共有するかを宣言できます。
次の例では、query1を専用のプールに割り当て、query2とquery3はスケジューラプールを共有します。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
ローカルプロパティの設定は、ストリーミングクエリを開始するのと同じノートブックセルにある必要があります。
公正なスケジューラ プールに関する詳細については、「Apache Spark 公正スケジューラのドキュメント」を参照してください。
「ステートフル クエリの考慮事項」
同じクラスターで実行しているステートフルクエリーの場合:次の点に注意してください。
- RocksDBを状態ストアプロバイダーとして使用することで、OOM問題やGCの停止を回避できます。RocksDBは、Databricks Runtime 17.3以降のデフォルトの状態ストアプロバイダーです。DatabricksでRocksDB状態ストアを構成するをご覧ください。
- アプリケーションの要件に合わせて、**シャッフルパーティションを調整**してください。ステートフルなステージでは、Spark はシャッフルパーティションの数に比例してタスクをスケジュールします。
- ノードごとにRocksDBのメモリ使用量に上限を設定することで、オフヒープメモリ使用によるOOMエラーを回避できます。Databricks Runtime 17.3 以降では自動的に処理されますが、以前のリリースでは手動での構成が必要です。「RocksDB メモリ使用量の上限」をご覧ください。
- 同じエグゼキューターノードにパーティションを詰め込みすぎないようにしてください。 状態ストアのメンテナンス操作は、スナップショットのアップロードとクリーンアップを含め、ノードごとに実行されます。1つのエグゼキューターノードに多数のパーティションを割り当てすぎると、利用可能な完全なスナップショットの数が少なくなるため、メンテナンスの枯渇や回復時間の長期化につながる可能性があります。