構造化ストリーミング チェックポイント
チェックポイントと先行書き込みログが連携して、構造化ストリーミングワークロードの処理を保証します。 チェックポイントは、状態情報や処理されたレコードなど、クエリを識別する情報を追跡します。 チェックポイント・ディレクトリ内のファイルを削除するか、新しいチェックポイント・ロケーションに変更すると、クエリの次の実行が新たに開始されます。
各クエリには、異なるチェックポイントの場所が必要です。 複数のクエリで同じ場所を共有しないでください。
構造化ストリーミング クエリのチェックポイント処理を有効にする
次の例のように、ストリーミングクエリを実行する前に checkpointLocation
オプションを指定する必要があります。
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
注:
ノートブックの display()
の出力や memory
シンクなど、一部のシンクでは、このオプションを省略すると、一時的なチェックポイントの場所が自動的に生成されます。 これらの一時的なチェックポイントの場所では、フォールト トレランスやデータ整合性の保証は保証されず、適切にクリーンアップされない可能性があります。 Databricks では、これらのシンクのチェックポイントの場所を常に指定することをお勧めします。
構造化ストリーミングクエリーの変更後の回復
同じチェックポイントの場所からの再起動間で許可されるストリーミング クエリの変更には制限があります。 ここでは、許可されていない変更、または変更の影響が明確に定義されていない変更をいくつか紹介します。 それらすべてについて:
allowed という用語は、指定された変更を実行できることを意味しますが、その効果のセマンティクスが明確に定義されているかどうかは、クエリと変更によって異なります。
「許可されていない」という用語は、再起動されたクエリが予期しないエラーで失敗する可能性があるため、指定された変更を行ってはならないことを意味します。
sdf
は、sparkSession.readStream
で生成されたストリーミング DataFrame/データセットを表します。
構造化ストリーミングクエリーの変更の種類
入力ソースの数または種類 (つまり、異なるソース) の変更: これは許可されません。
入力ソースのパラメーターの変更: これが許可されているかどうか、および変更のセマンティクスが適切に定義されているかどうかは、ソース とクエリによって異なります。 次に例をいくつか示します。
レート制限の追加、削除、および変更は許可されます。
spark.readStream.format("kafka").option("subscribe", "article")
-
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
購読している記事やファイルへの変更は、結果が予測できないため、通常は許可されません
spark.readStream.format("kafka").option("subscribe", "article")
。spark.readStream.format("kafka").option("subscribe", "newarticle")
トリガー間隔の変更: 増分バッチと時間間隔の間でトリガーを変更できます。 実行間のトリガー間隔の変更を参照してください。
出力シンクの種類の変更: シンクのいくつかの特定の組み合わせ間の変更は許可されます。 これはケースバイケースで検証する必要があります。 次に例をいくつか示します。
ファイル シンクから Kafka シンクへのファイル・シンクは許可されます。 Kafka には新しいデータのみが表示されます。
Kafka シンクからファイル シンクへのコピーは許可されません。
Kafka シンクが foreach に変更された場合、またはその逆が許可されます。
出力シンクのパラメーターの変更: これが許可されているかどうか、および変更のセマンティクスが適切に定義されているかどうかは、シンクとクエリによって異なります。 次に例をいくつか示します。
ファイルシンクの出力ディレクトリへの変更は許可されていません:
sdf.writeStream.format("parquet").option("path", "/somePath")
sdf.writeStream.format("parquet").option("path", "/anotherPath")
出力トピックの変更は許可されています:
sdf.writeStream.format("kafka").option("topic", "topic1")
sdf.writeStream.format("kafka").option("topic", "topic2")
ユーザー定義の foreach シンク (つまり、
ForeachWriter
コード) への変更は許可されますが、変更のセマンティクスはコードによって異なります。
投影法/フィルター/マップライクな操作の変更:一部許容されるケースがあります。 例えば:
フィルターの追加/削除は許可されています:
sdf.selectExpr("a")
からsdf.where(...).selectExpr("a").filter(...)
。同じ出力スキーマでのプロジェクションの変更は、
sdf.selectExpr("stringColumn AS json").writeStream
からsdf.select(to_json(...).as("json")).writeStream
まで許可されます。異なる出力スキーマを持つプロジェクションの変更は条件付きで許可されます:
sdf.selectExpr("a").writeStream
からsdf.selectExpr("b").writeStream
は、出力シンクがスキーマを"a"
から"b"
に変更できる場合にのみ許可されます。
ステートフル操作の変更: ストリーミング クエリの一部の操作では、結果を継続的に更新するために状態データを保持する必要があります。 構造化ストリーミングは、フォールトトレラントストレージ(DBFS、AWS S3、Azure Blobストレージなど)に状態データを自動的にチェックポイントし、再起動後に復元します。 ただし、これは、状態データのスキーマが再起動後も同じままであることを前提としています。 つまり 、ストリーミング クエリのステートフル操作に対する変更 (つまり、追加、削除、スキーマの変更) は、再起動間で許可されません。 以下は、状態の回復を確実にするために再起動間でスキーマを変更してはならないステートフルな操作のリストです。
ストリーミング集計: たとえば、
sdf.groupBy("a").agg(...)
. グループ化キーまたは集計の数またはタイプを変更することは許可されません。ストリーミング重複排除: たとえば、
sdf.dropDuplicates("a")
. グループ化キーまたは集計の数またはタイプを変更することは許可されません。ストリーム-ストリーム join: たとえば、
sdf1.join(sdf2, ...)
です (つまり、両方の入力がsparkSession.readStream
で生成されます)。 スキーマの変更や等結合列は許可されません。 ジョインの種類 (外部または内部) の変更は許可されません。 ジョイン条件のその他の変更は、ill-defined です。任意のステートフル操作:
sdf.groupByKey(...).mapGroupsWithState(...)
やsdf.groupByKey(...).flatMapGroupsWithState(...)
など。 ユーザー定義の状態のスキーマとタイムアウトのタイプに対する変更は許可されません。 ユーザー定義の状態マッピング関数内の任意の変更は許可されますが、変更のセマンティック効果はユーザー定義ロジックによって異なります。 状態スキーマの変更を本当にサポートする場合は、スキーマの移行をサポートするエンコード/デコード スキームを使用して、複雑な状態データ構造を明示的にエンコード/デコードできます。 たとえば、状態を Avro エンコードされたバイトとして保存すると、バイナリ状態が復元されるため、クエリの再起動間で Avro-state-schema を変更できます。