メインコンテンツまでスキップ

構造化ストリーミング チェックポイント

チェックポイントと先行書き込みログが連携して、構造化ストリーミングワークロードの処理を保証します。 チェックポイントは、状態情報や処理されたレコードなど、クエリを識別する情報を追跡します。 チェックポイント・ディレクトリ内のファイルを削除するか、新しいチェックポイント・ロケーションに変更すると、クエリの次の実行が新たに開始されます。

チェックポイントディレクトリには以下のものが含まれています。

  • オフセット :各マイクロバッチで処理されるソースオフセット。これにより、クエリはデータを再処理することなく、中断した箇所から正確に再開できます。
  • コミット :どのマイクロバッチがシンクにコミットされたかの記録であり、厳密に1回限りのセマンティクスを可能にします。
  • State : ステートフル クエリ (集計、ストリーム ストリーム結合、重複排除、およびtransformWithStateなどのカスタム ステートフル オペレーター) の場合、チェックポイントには、ステートフル オペレーター、状態スキーマ、および状態ストア プロバイダーによって管理されるチェックポイントされた状態ストアのコンテンツに関するメタデータが保存されます。
  • メタデータ :クエリを識別するために使用される一意のクエリID。設定情報はオフセットログの一部として保存されます。

各クエリには、異なるチェックポイントの場所が必要です。 複数のクエリで同じ場所を共有しないでください。

注記

この記事では、ストリーミング クエリの構造化ストリーミング チェックポイントについて説明します。Unity CatalogボリュームでDataFrame.checkpoint()を使用して非ストリーミングDataFramesの実行プランを切り詰める方法については、 「ボリュームのDataFrameチェックポイント」を参照してください。

構造化ストリーミング クエリのチェックポイント処理を有効にする

次の例のように、ストリーミングクエリを実行する前に checkpointLocation オプションを指定する必要があります。

Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
注記

ノートブックの display() の出力や memory シンクなど、一部のシンクでは、このオプションを省略すると、一時的なチェックポイントの場所が自動的に生成されます。 これらの一時的なチェックポイントの場所では、フォールト トレランスやデータ整合性の保証は保証されず、適切にクリーンアップされない可能性があります。 Databricks では、これらのシンクのチェックポイントの場所を常に指定することをお勧めします。

構造化ストリーミング クエリの変更後の回復

同じチェックポイント位置からの再起動間で、ストリーミングクエリに対して許可される変更には制限があります。

一般的に新しいチェックポイントが必要となる変更には、入力の数またはタイプ、サブスクライブされたKafkaトピックまたはAuto Loaderパス、ステートフル操作タイプ、ステートスキーマ、および出力シンクタイプが含まれます。

一般的に安全な変更には、フィルターの追加または削除、レート制限の変更、トリガー間隔の変更、およびmapGroupsWithState内のユーザー定義関数ロジックの更新が含まれます(ただし、意味が変わる可能性があります)。

次のセクションでは、許可されていない変更、または変更の影響が明確に定義されていない変更について説明します。

  • allowed という用語は、指定された変更を実行できることを意味しますが、その効果のセマンティクスが明確に定義されているかどうかは、クエリと変更によって異なります。
  • 許可されていないという用語は 、再開されたクエリが予期しないエラーで失敗する可能性があるため、指定された変更を行ってはいけないことを意味します。
  • sdf は、 sparkSession.readStreamで生成されたストリーミング データフレーム/データセットを表します。

構造化ストリーミング クエリの変更の種類

  • 入力ソースの数や種類の変更 :構造化ストリーミングはクエリプラン内の位置によってソースを識別するため、デフォルトではこれは許可されていません。ソースの名前付けをオンにすると、新しいチェックポイントから開始することなく、既存のソースを並べ替えたり、新しいソースを追加したりできます。 「ソースの進化によるストリーミング ソースの変更」を参照してください。

  • 入力ソースの変更 : これが許可されるかどうか、および変更のセマンティクスが明確に定義されているかどうかは、ソースとクエリmaxFilesPerTriggermaxOffsetsPerTriggerなどのアドミッション コントロールを含む) によって決まります。 以下にいくつかの例を示します。

    • レート制限の追加、削除、および変更は許可されます。

      Scala
      spark.readStream.format("kafka").option("subscribe", "article")

      -

      Scala
      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)

      詳細については、 「Databricks で構造化ストリーミングのバッチサイズを構成する」を参照してください。

    • 購読している記事やファイルへの変更は、結果が予測できないため、通常は許可されません spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • トリガー間隔の変更 : 増分バッチと時間間隔の間でトリガーを変更できます。「実行間のトリガー間隔を変更する」を参照してください。

  • 出力シンクの種類の変更 : シンクのいくつかの特定の組み合わせ間の変更は許可されます。 これはケースバイケースで検証する必要があります。 次に例をいくつか示します。

    • ファイル シンクから Kafka シンクへのファイル・シンクは許可されます。 Kafka には新しいデータのみが表示されます。
    • Kafka シンクからファイル シンクへのコピーは許可されません。
    • Kafka シンクが foreach に変更された場合、またはその逆が許可されます。
  • 出力シンクのパラメーターの変更 : これが許可されているかどうか、および変更のセマンティクスが適切に定義されているかどうかは、シンクとクエリによって異なります。 次に例をいくつか示します。

    • ファイルシンクの出力ディレクトリは変更できません。sdf.writeStream.format("parquet").option("path", "/somePath") から sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 出力トピックの変更は許可されています: sdf.writeStream.format("kafka").option("topic", "topic1") sdf.writeStream.format("kafka").option("topic", "topic2")
    • ユーザー定義の foreach シンク (つまり、 ForeachWriter コード) への変更は許可されますが、変更のセマンティクスはコードによって異なります。
  • 投影法/フィルター/マップライクな操作の変更 :一部許容されるケースがあります。 例えば:

    • フィルターの追加/削除は許可されています: sdf.selectExpr("a") から sdf.where(...).selectExpr("a").filter(...)
    • 同じ出力スキーマでのプロジェクションの変更は、 sdf.selectExpr("stringColumn AS json").writeStream から sdf.select(to_json(...).as("json")).writeStreamまで許可されます。
    • 異なる出力スキーマを持つプロジェクションの変更は条件付きで許可されます: sdf.selectExpr("a").writeStream から sdf.selectExpr("b").writeStream は、出力シンクがスキーマを "a" から "b"に変更できる場合にのみ許可されます。
  • ステートフル操作の変更 : ストリーミング クエリの一部の操作では、結果を継続的に更新するために状態データを保持する必要があります。 構造化ストリーミングは、フォールトトレラントストレージ(DBFS、AWS S3、Azure Blobストレージなど)に状態データを自動的にチェックポイントし、再起動後に復元します。 ただし、これは、状態データのスキーマが再起動後も同じままであることを前提としています。 つまり 、ストリーミング クエリのステートフル操作に対する変更 (つまり、追加、削除、スキーマの変更) は、再起動間で許可されません 。 以下は、状態の回復を確実にするために再起動間でスキーマを変更してはならないステートフルな操作のリストです。

    • ストリーミング集計 : たとえば、 sdf.groupBy("a").agg(...). グループ化キーまたは集計の数またはタイプを変更することは許可されません。
    • ストリーミング重複排除 : たとえば、 sdf.dropDuplicates("a"). グループ化キーまたは集計の数またはタイプを変更することは許可されません。
    • ストリーム-ストリーム join : たとえば、 sdf1.join(sdf2, ...) です (つまり、両方の入力が sparkSession.readStreamで生成されます)。 スキーマの変更や等結合列は許可されません。 ジョインの種類 (外部または内部) の変更は許可されません。 ジョイン条件のその他の変更は、ill-defined です。
    • 任意のステートフル操作 : sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)など。 ユーザー定義の状態のスキーマとタイムアウトのタイプに対する変更は許可されません。 ユーザー定義の状態マッピング関数内の任意の変更は許可されますが、変更のセマンティック効果はユーザー定義ロジックによって異なります。 状態スキーマの変更を本当にサポートする場合は、スキーマの移行をサポートするエンコード/デコード スキームを使用して、複雑な状態データ構造を明示的にエンコード/デコードできます。 たとえば、状態を Avro エンコードされたバイトとして保存すると、バイナリ状態が復元されるため、クエリの再起動間で Avro-state-schema を変更できます。
重要

ステートフル演算子 dropDuplicates()dropDuplicatesWithinWatermark() は、コンピュート アクセス モード間で変更するときに、状態スキーマの互換性チェックが原因で再起動に失敗することがあります。

専用アクセスモードと分離なしアクセスモード間の変更は許可されています。 標準アクセスモードとサーバレスアクセスモード間の変更は可能です。 他のアクセスモードの組み合わせ間で変更しようとしないでください。

このエラーを回避するには、これらの演算子を含むストリーミング クエリのコンピュート アクセス モードを変更しないでください。

ソースの進化でストリーミング ソースを変える

デフォルトでは、構造化ストリーミングはクエリ プラン内での位置によってソースを識別します。たとえば、 012などです。入力ソースの数または順序のいずれかを変更すると、チェックポイントの互換性が損なわれ、新しいチェックポイントが必要になります。ソースの進化により、各ストリーミング ソースに安定したユーザー定義の名前を割り当てることができるため、チェックポイントの状態を失うことなく、クエリからソースを並べ替えたり、追加したり、削除したりすることができます。

ソースコードの進化には、Databricks Runtime 18.2以降が必要です。

必要な構成

ソース進化を有効にするには、次の2つのSpark設定を行います。

  • spark.sql.streaming.queryEvolution.enableSourceEvolution: trueの場合、クエリ内のすべてのストリーミングソースは、 .name() API を使用して明示的に指定する必要があります。デフォルトはfalseです。
  • spark.sql.streaming.offsetLog.formatVersion名前ベースのオフセット追跡形式を使用するには、 2に設定する必要があります。デフォルトは1です。

ストリーミングクエリを定義する前に、両方の設定を行ってください。

Python
spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

命名規則

  • 名前には英数字とアンダースコア( [a-zA-Z0-9_]+ )のみを含める必要があります。
  • 各ソース名はクエリ内で一意である必要があります。
  • ソース進化が有効になっている場合、すべてのストリーミングソースには名前が必要です。名前のないソースはUNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENTエラーを引き起こします。

ソースの並べ替え、追加、削除

以下の変更は、同じチェックポイントを使用したクエリの再起動後も安全です。

  • ソースの順序を変更する :異なる順序のソースを使用してクエリを再開します。各ソースは、その名前に基づいて最後にコミットされたオフセットから処理を再開し、チェックポイントの状態は変更しません。
  • 新しいソースの追加 : 新しいソースを使用してクエリを再開します。 新しいソースは最初から処理され、既存のソースは最後のオフセットから続行されます。
  • ソースを削除 : ソースを使用せずにクエリを再起動します。 当該車両は検問所から永久に撤去される。 削除されたソースは、同じ名前で再度追加することはできません。

.load()または.table()を呼び出す前に、 DataStreamReaderに対して.name()使用してください。

Python
orders_us = (spark.readStream
.name("orders_us")
.table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
.name("orders_eu")
.table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

制限事項

  • ソース名の命名規則には、新たなチェックポイントが必要です。V1オフセットログ形式を使用する既存のチェックポイントでは、ソース進化を有効にすることはできません。
  • オフセットログフォーマットV2にアップグレードした後は、V1にダウングレードすることはできません。必要な設定を参照してください。
  • ソース名は永続的です。ソースの名前を変更するには、まずソースを削除し、新しい名前で追加します。名前が変更されたソースは、最初から処理を再開します。