メインコンテンツまでスキップ

非同期進捗追跡とは?

非同期進行状況追跡を使用すると、構造化ストリーミング パイプラインは、マイクロバッチ内の実際のデータ処理と並行して非同期に進行状況をチェックポイント化できるため、 offsetLogcommitLogの維持に関連する待機時間が短縮されます。

非同期進行状況の追跡

注記

非同期進行状況追跡は、 Trigger.once トリガーや Trigger.availableNow トリガーでは機能しません。 これらのトリガーでこの機能を有効にしようとすると、クエリが失敗します。

非同期進行状況追跡はどのようにして待ち時間を短縮しますか?

構造化ストリーミングは、クエリ処理の進行状況インジケーターとしてオフセットを永続化および管理することに依存しています。 オフセット管理操作は、これらの操作が完了するまでデータ処理が行われないため、処理の待機時間に直接影響します。 非同期進行状況追跡を使用すると、構造化ストリーミング パイプラインは、これらのオフセット管理操作の影響を受けずに進行状況をチェックポイントできます。

チェックポイントの頻度はいつ設定すべきですか?

ユーザーは、進行状況がチェックポイントされる頻度を構成できます。 チェックポイント頻度のデフォルト設定では、ほとんどのクエリで良好なスループットが得られます。 頻度の構成は、オフセット管理操作が処理可能な速度よりも高い割合で発生し、オフセット管理操作のバックログが増加するシナリオに役立ちます。 この増大するバックログを食い止めるために、データ処理はブロックまたは速度を落とし、基本的に処理動作を元に戻して、非同期の進行状況追跡の利点を排除します。

注記

障害リカバリー時間は、チェックポイント間隔時間の増加に伴って増加します。 失敗した場合、パイプラインは、前回の成功したチェックポイントの前にすべてのデータを再処理する必要があります。 ユーザーは、通常の処理中の低遅延と障害発生時の復旧時間との間のこのトレードオフを検討できます。

非同期進捗追跡にはどのような構成が関連付けられていますか?

オプション

デフォルト

説明

非同期進行状況追跡有効

真/偽

False

非同期進行状況追跡を有効または無効にする

asyncProgressTrackingCheckpointIntervalMs

ミリ秒

1000

オフセットをコミットし、完了をコミットする間隔

ユーザーはどのようにして非同期の進行状況追跡を有効にできますか?

ユーザーは、以下のコードのようなコードを使用してこの機能を有効にすることができます。

Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()

val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()

非同期進捗追跡をオフにする

非同期進行状況追跡が有効になっている場合、フレームワークはバッチごとに進行状況をチェックポイントしません。 これに対処するには、非同期進行状況追跡を無効にする前に、次の設定で少なくとも 2 つのマイクロバッチを処理します。

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

少なくとも 2 つのマイクロバッチの処理が完了したら、クエリを停止します。 これで、非同期進行状況の追跡を安全に無効にして、クエリを再起動できます。

この手順を完了せずに非同期進行状況追跡を無効にした場合、次のエラーが発生することがあります。

java.lang.IllegalStateException: batch x doesn't exist

ドライバー ログに、次のエラーが表示される場合があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

このセクションの手順に従って非同期進行状況追跡を無効にすると、これらのエラーに対処し、ストリーミングワークロードを修復できます。

非同期進行状況追跡の制限事項

この機能には、次の制限があります。

  • 非同期進行状況の追跡は、Kafka をシンクとして使用するステートレス パイプラインでのみサポートされます。
  • 非同期進捗追跡では、障害が発生した場合にバッチのオフセット範囲を変更できるため、一度だけエンドツーエンドの処理が保証されません。 Kafka などの一部のシンクでは、exactly-once の保証は提供されません。