構造化ストリーミングに関する本番運用の考慮事項
このページには、Databricks のジョブを使用して構造化ストリーミングワークロードをスケジュールするための推奨事項が記載されています。
Databricks では、常に次のことを行うことをお勧めします。
displayやcountなどの結果を返す不要なコードをノートブックから削除します。- 汎用コンピュートを使用して構造化ストリーミング ワークロードを実行しないでください。 常にジョブ コンピュートを使用してストリームをジョブとしてスケジュールします。
- ジョブを
Continuousモードでスケジュールします。これは、Databricksのジョブスケジューリング機能に関するものであり、構造化ストリーミングのトリガー間隔に関するものではありません。 - 構造化ストリーミング ジョブではコンピュートの自動スケーリングを有効にしないでください。
ワークロードによっては、次のメリットがあります。
Databricks構造化ストリーミング ワークロードの本番運用インフラストラクチャの管理の複雑さを軽減するために、 Lakeflow Spark宣言型パイプラインを導入しました。 Databricks新しい構造化ストリーミング パイプラインにはLakeflow Spark宣言型パイプラインの使用を推奨しています。 Lakeflow Spark宣言型パイプラインを参照してください。
コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする際の制限があります。 Databricksストリーミング ワークロード用に強化されたオートスケールを備えたLakeflow Spark宣言型パイプラインを使用することをお勧めします。 「オートスケールを使用したLakeflow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。
:::note サーバレスコンピュート
サーバレスコンピュートでは、 Trigger.AvailableNow()とTrigger.Once()のみがサポートされます。 DatabricksはTrigger.AvailableNow()推奨しています。
サーバーレス コンピュートでの連続ストリーミングの場合は、連続モードでトリガー モードと連続パイプライン モードを使用します。
ストリーミングの制限事項をご覧ください。
:::
障害を想定するようにストリーミング ワークロードを設計する
Databricksは、ストリーミングジョブが失敗した場合に自動的に再起動するように常に設定することを推奨しています。スキーマ進化などの一部の機能は、構造化ストリーミングワークロードが自動的に再試行するように構成されていることを前提としています。障害発生時にストリーミングクエリを再開するための構造化ストリーミングジョブの設定を参照してください。
foreachBatchのような一部の操作は、正確に 1 回ではなく、少なくとも 1 回という保証を提供します。これらの操作を行う際には、処理パイプラインが冪等性を持つようにする必要があります。任意のデータシンクに書き込むには、foreachBatch の使用を参照してください。
クエリが再開されると、前回の実行中に計画されたマイクロバッチが処理されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチが大きすぎるためにジョブを手動でキャンセルした場合は、マイクロバッチを正常に処理するためにコンピュートをスケールアップする必要があります。
実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 構造化ストリーミング クエリの変更後の回復を参照してください。
ジョブはいつ再試行されますか?
Databricks ジョブの一部として複数のタスクをスケジュールできます。 連続トリガーを使用してジョブを構成する場合、タスク間の依存関係を設定することはできません。
次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることを選択できます。
- 複数のタスク : 連続トリガーを使用してストリーミング ワークロードを実行する複数のタスクを持つジョブを定義します。
- 複数のクエリ : 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。
これらの戦略を組み合わせることもできます。 次の表では、これらのアプローチを比較しています。
戦略: | 複数のタスク | 複数のクエリ |
|---|---|---|
コンピュートはどのように共有されますか? | Databricks では、各ストリーミング タスクに適切なサイズでジョブ コンピュートをデプロイすることをお勧めします。 必要に応じて、タスク間でコンピュートを共有できます。 | すべてのクエリは同じコンピュートを共有します。 オプションで、 スケジューラー・プールに照会を割り当てることができます。 |
再試行はどのように処理されますか? | すべてのタスクは、ジョブが再試行される前に失敗する必要があります。 | クエリが失敗した場合、タスクは再試行します。 |
構造化ストリーミング ジョブを構成して、失敗時にストリーミング クエリを再開する
Databricks では、すべてのストリーミング ワークロードを継続的トリガーを使用して構成することをお勧めします。 ジョブの継続的な実行を参照してください。
連続トリガーは、デフォルトで次の動作を提供します。
- ジョブの複数の並列実行を防止します。
- 前の実行が失敗したときに、新しい実行を開始します。
- 再試行には指数バックオフを使用します。
Databricks 、ワークフローをスケジュールする際には、常に 汎用 コンピュートではなく、ジョブ コンピュートを使用することをお勧めします。 ジョブの失敗と再試行時に、新しいコンピュート リソースがデプロイされます。
Databricks はstreamingQuery.awaitTermination()またはspark.streams.awaitAnyTermination()を使用しないことを推奨します。ストリーミングクエリがアクティブな場合、ジョブサービスが自動的に実行の完了を阻止するため、これらの関数をコードに含める必要はありません。これらの機能はいずれもノートブックセルの完了を妨げ、ジョブサービスがストリーミングクエリを追跡するのを阻止します。また、ストリーミングバックログのメトリクスと通知がジョブサービスに送信されないため、ジョブ通知が中断されます。
複数のストリーミング クエリにスケジューラ プールを使用する
同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピュート容量を割り当てるようにスケジュール プールを構成できます。
デフォルトでは、ノートブックで開始されたすべてのクエリは、同じ公正なスケジューリングプールで実行されます。 ノートブック内のすべてのストリーミング クエリのトリガーによって生成された Apache Spark ジョブは、"先入れ先出し" (FIFO) の順序で 1 つずつ実行されます。 これにより、クエリがクラスター リソースを効率的に共有していないため、クエリに不要な遅延が発生する可能性があります。
スケジューラ プールを使用すると、コンピュート リソースを共有する構造化ストリーミング クエリを宣言できます。
次の例では、 query1 を専用プールに割り当て、 query2 と query3 はスケジューラ プールを共有します。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
ローカル プロパティの構成は、ストリーミング クエリを開始するのと同じノートブック セルに存在する必要があります。
詳細については Apache 公正なスケジューラのドキュメント を参照してください。