メインコンテンツまでスキップ

構造化ストリーミングに関する本番運用の考慮事項

本ページでは、Databricks での Lakeflow Jobs を使用した構造化ストリーミング ワークロードのスケジューリングに関する推奨事項を記載しています。Lakeflowジョブを参照してください。

Databricksは、常に以下の設定を行うことを推奨します。

  • displaycount などの結果を返す不要なコードをノートブックから削除します。
  • 構造化ストリーミングのワークロードを汎用コンピュートで実行しないでください。ストリームは、ジョブコンピュートを使用してLakeflowジョブとして必ずスケジュールしてください。
  • Continuousモードを使用して、Lakeflowジョブをスケジュールするこれは Databricks Jobs のスケジューリング機能のことであり、構造化ストリーミングの「トリガー間隔」ではありません。
  • 構造化ストリーミング ジョブではコンピュートのオートスケールを有効にしないでください。

ワークロードによっては、次のメリットがあります。

Databricks構造化ストリーミング ワークロードの本番運用インフラストラクチャの管理の複雑さを軽減するために、 Lakeflow Spark宣言型パイプラインを導入しました。 Databricks新しい構造化ストリーミング パイプラインにはLakeflow Spark宣言型パイプラインの使用を推奨しています。 Lakeflow Spark宣言型パイプラインを参照してください。

注記

コンピュートの自動スケーリングには、構造化ストリーミング ワークロードのクラスター サイズをスケールダウンする際の制限があります。 Databricksストリーミング ワークロード用に強化されたオートスケールを備えたLakeflow Spark宣言型パイプラインを使用することをお勧めします。 「オートスケールを使用したLakeflow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。

:::note サーバレスコンピュート

サーバレスコンピュートでは、 Trigger.AvailableNow()Trigger.Once()のみがサポートされます。 DatabricksはTrigger.AvailableNow()推奨しています。

サーバーレス コンピュートでの連続ストリーミングの場合は、連続モードでトリガー モードと連続パイプライン モードを使用します。

ストリーミングの制限事項をご覧ください。

:::

障害を想定するようにストリーミング ワークロードを設計する

Databricks では、失敗時に自動的に再起動するようにストリーミング ジョブを常に構成することをお勧めします。スキーマ進化を含む一部の機能では、構造化ストリーミング ワークロードが自動的に再試行する必要があります。「失敗時にストリーミング クエリを再開するための構造化ストリーミング ジョブの構成」を参照してください。

foreachBatchのような一部の操作は、正確に 1 回ではなく、少なくとも 1 回という保証を提供します。これらの操作を行う際は、処理パイプラインが冪等性を持つようにしてください。任意のデータシンクに書き込むには、foreachBatch の使用を参照してください。

注記

クエリが再開されると、前回の実行中に計画されたマイクロバッチが処理されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチが大きすぎるためにジョブを手動でキャンセルした場合は、マイクロバッチを正常に処理するためにコンピュートをスケールアップする必要があります。

実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 構造化ストリーミング クエリの変更後の回復を参照してください。

ジョブはいつ再試行されますか?

Databricks ジョブの一部として複数のタスクをスケジュールできます。 連続トリガーを使用してジョブを構成する場合、タスク間の依存関係を設定することはできません。

次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることを選択できます。

  • 複数のタスク : 連続トリガーを使用してストリーミング ワークロードを実行する複数のタスクを持つジョブを定義します。
  • 複数のクエリ : 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。

これらの戦略を組み合わせることもできます。 次の表では、これらのアプローチを比較しています。

戦略

複数のタスク

複数のクエリ

コンピュートはどのように共有されますか?

Databricks では、各ストリーミング タスクに適切なサイズでジョブ コンピュートをデプロイすることをお勧めします。 必要に応じて、タスク間でコンピュートを共有できます。

すべてのクエリは同じコンピュートを共有します。 クエリをスケジューラプールに任意で割り当てることができます。

再試行はどのように処理されますか?

すべてのタスクは、ジョブが再試行される前に失敗する必要があります。

クエリが失敗した場合、タスクは再試行します。

複数のタスクまたはクエリの操作の詳細については、「同じクラスターで複数の構造化ストリーミング クエリを実行」を参照してください。

構造化ストリーミング ジョブを構成して、失敗時にストリーミング クエリを再開する

Databricks では、すべてのストリーミング ワークロードを継続的トリガーを使用して構成することをお勧めします。ジョブの継続的な実行を参照してください。

継続トリガーは、デフォルトでは以下の動作をします。

  • ジョブの複数の並列実行を防止します。
  • 前の実行が失敗したときに、新しい実行を開始します。
  • 再試行には指数バックオフを使用します。

Databricks 、ワークフローをスケジュールする際には、常に 汎用 コンピュートではなく、ジョブ コンピュートを使用することをお勧めします。 ジョブの失敗と再試行時に、新しいコンピュート リソースがデプロイされます。

注記

Databricks はstreamingQuery.awaitTermination()またはspark.streams.awaitAnyTermination()を使用しないことを推奨します。awaitTermination()使用時期を参照してください。

いつ使用するか awaitTermination()

streamingQuery.awaitTermination() そしてspark.streams.awaitAnyTermination() 、ストリーミングクエリが終了するまで現在のスレッドをブロックします。これらの関数を使用するかどうかは、実行環境によって異なります。

Lakeflow ジョブでは、streamingQuery.awaitTermination()またはspark.streams.awaitAnyTermination()を使用しないでください。ストリーミングクエリがアクティブな場合、ジョブサービスが実行の完了を自動的に防ぐため、これらの機能は不要です。これらの機能はどちらも、ノートブックセルの完了をブロックし、ジョブサービスによるストリーミングクエリの追跡を妨げます。これにより、バックログメトリクスおよびジョブ通知が中断されます。

以下の場合はawaitTermination()使用してください。

ユースケース

挙動

汎用コンピュートに関するインタラクティブなノートブック

awaitTermination() セルを常に実行状態に保ち、クエリの状態を監視できるようにし、ノートブックの出力に障害が確実に表示されるようにします。

地域および開発環境

Sparkプログラムをローカルで実行する場合、メインスレッドが完了するとプロセスは終了します。ストリーミングクエリが完了または失敗するまでプログラムを継続させるには、 awaitTermination()を呼び出してください。

ドライバーへの障害伝播

awaitTermination()がない場合、ジョブ以外のコンテキストでのストリーミングクエリの失敗は、呼び出し元のスレッドに伝播しない可能性があります。クエリーはサイレントに失敗する可能性があり、その結果、失敗の検出と診断が困難になります。awaitTermination()を呼び出すと、ドライバー上でクエリ例外が再度発生します。