構造化ストリーミングに関する本番運用の考慮事項
このページには、Databricks のジョブを使用して構造化ストリーミングワークロードをスケジュールするための推奨事項が記載されています。
Databricksは、常に以下の設定を行うことを推奨します。
displayやcountなどの結果を返す不要なコードをノートブックから削除します。- 汎用コンピュートを使用して構造化ストリーミング ワークロードを実行しないでください。 常にジョブ コンピュートを使用してストリームをジョブとしてスケジュールします。
- ジョブを
Continuousモードでスケジュールします。これは、Databricksのジョブスケジューリング機能に関するものであり、構造化ストリーミングのトリガー間隔に関するものではありません。 - 構造化ストリーミング ジョブのコンピュートのオートスケールを有効にしないでください。
ワークロードによっては、次のメリットがあります。
Databricks構造化ストリーミング ワークロードの本番運用インフラストラクチャの管理の複雑さを軽減するために、 Lakeflow Spark宣言型パイプラインを導入しました。 Databricks新しい構造化ストリーミング パイプラインにはLakeflow Spark宣言型パイプラインの使用を推奨しています。 Lakeflow Spark宣言型パイプラインを参照してください。
コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする際の制限があります。 Databricksストリーミング ワークロード用に強化されたオートスケールを備えたLakeflow Spark宣言型パイプラインを使用することをお勧めします。 「オートスケールを使用したLakeflow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。
:::note サーバレスコンピュート
サーバレスコンピュートでは、 Trigger.AvailableNow()とTrigger.Once()のみがサポートされます。 DatabricksはTrigger.AvailableNow()推奨しています。
サーバーレス コンピュートでの連続ストリーミングの場合は、連続モードでトリガー モードと連続パイプライン モードを使用します。
ストリーミングの制限事項をご覧ください。
:::
障害を想定するようにストリーミング ワークロードを設計する
Databricksは、ストリーミングジョブが失敗した場合に自動的に再起動するように常に設定することを推奨しています。スキーマ進化などの一部の機能では、構造化ストリーミングワークロードが自動的に再試行するように構成する必要があります。障害発生時にストリーミングクエリを再開するための構造化ストリーミングジョブの設定を参照してください。
foreachBatchのような一部の操作は、正確に 1 回ではなく、少なくとも 1 回という保証を提供します。これらの操作を行う際は、処理パイプラインが冪等性を持つようにしてください。任意のデータシンクに書き込むには、foreachBatch の使用を参照してください。
クエリが再開されると、前回の実行中に計画されたマイクロバッチが処理されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチが大きすぎるためにジョブを手動でキャンセルした場合は、マイクロバッチを正常に処理するためにコンピュートをスケールアップする必要があります。
実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 構造化ストリーミング クエリの変更後の回復を参照してください。
ジョブはいつ再試行されますか?
Databricks ジョブの一部として複数のタスクをスケジュールできます。 連続トリガーを使用してジョブを構成する場合、タスク間の依存関係を設定することはできません。
次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることを選択できます。
- 複数のタスク : 連続トリガーを使用してストリーミング ワークロードを実行する複数のタスクを持つジョブを定義します。
- 複数のクエリ : 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。
これらの戦略を組み合わせることもできます。 次の表では、これらのアプローチを比較しています。
戦略 | 複数のタスク | 複数のクエリ |
|---|---|---|
コンピュートはどのように共有されますか? | Databricks では、各ストリーミング タスクに適切なサイズでジョブ コンピュートをデプロイすることをお勧めします。 必要に応じて、タスク間でコンピュートを共有できます。 | すべてのクエリは同じコンピュートを共有します。 クエリをスケジューラプールに任意で割り当てることができます。 |
再試行はどのように処理されますか? | すべてのタスクは、ジョブが再試行される前に失敗する必要があります。 | クエリが失敗した場合、タスクは再試行します。 |
構造化ストリーミング ジョブを構成して、失敗時にストリーミング クエリを再開する
Databricks では、すべてのストリーミング ワークロードを継続的トリガーを使用して構成することをお勧めします。 ジョブの継続的な実行を参照してください。
継続トリガーは、デフォルトでは以下の動作をします。
- ジョブの複数の並列実行を防止します。
- 前の実行が失敗したときに、新しい実行を開始します。
- 再試行には指数バックオフを使用します。
Databricks 、ワークフローをスケジュールする際には、常に 汎用 コンピュートではなく、ジョブ コンピュートを使用することをお勧めします。 ジョブの失敗と再試行時に、新しいコンピュート リソースがデプロイされます。
Databricks はstreamingQuery.awaitTermination()またはspark.streams.awaitAnyTermination()を使用しないことを推奨します。awaitTermination()使用時期を参照してください。
いつ使用するか awaitTermination()
streamingQuery.awaitTermination() そしてspark.streams.awaitAnyTermination() 、ストリーミングクエリが終了するまで現在のスレッドをブロックします。これらの関数を使用するかどうかは、実行環境によって異なります。
Databricks ジョブの場合、 streamingQuery.awaitTermination()またはspark.streams.awaitAnyTermination()は使用しないでください。ストリーミングクエリがアクティブな場合、ジョブサービスが自動的に実行の完了を阻止するため、これらの機能は必要ありません。どちらの機能もノートブックのセルが完了するのをブロックし、ジョブサービスがストリーミングクエリを追跡するのを妨げるため、バックログメトリクスとジョブ通知が混乱します。
以下の場合はawaitTermination()使用してください。
ユースケース | 挙動 |
|---|---|
汎用コンピュートに関するインタラクティブなノートブック |
|
地域および開発環境 | Sparkプログラムをローカルで実行する場合、メインスレッドが完了するとプロセスは終了します。ストリーミングクエリが完了または失敗するまでプログラムを継続させるには、 |
ドライバーへの障害伝播 |
|
複数のストリーミング クエリにスケジューラ プールを使用する
同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピュート容量を割り当てるようにスケジューラ プールを構成できます。
デフォルトでは、ノートブックで開始されたすべてのクエリは、同じ公正なスケジューリングプールで実行されます。 ノートブック内のすべてのストリーミング クエリのトリガーによって生成された Apache Spark ジョブは、"先入れ先出し" (FIFO) の順序で 1 つずつ実行されます。 これにより、クエリがクラスター リソースを効率的に共有していないため、クエリに不要な遅延が発生する可能性があります。
スケジューラ プールを使用すると、コンピュート リソースを共有する構造化ストリーミング クエリを宣言できます。
次の例では、 query1に専用のプールを割り当て、 query2とquery3スケジューラプールを共有します。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
ローカル プロパティの構成は、ストリーミング クエリを開始するのと同じノートブック セルに存在する必要があります。
Apacheフェア スケジューラ プールの詳細については、 Apacheフェア スケジューラ ドキュメント」を参照してください。