ストリーミングチェックポイントの失敗から LakeFlow 宣言型パイプラインを回復する
このページでは、ストリーミングチェックポイントが無効または破損した場合に、宣言型パイプライン LakeFlow でパイプラインを回復する方法について説明します。
ストリーミングチェックポイントとは何ですか?
Apache Spark 構造化ストリーミング、チェックポイントは、ストリーミング クエリの状態を永続化するために使用されるメカニズムです。この状態には次のものが含まれます。
- 進行状況情報 : ソースからのどのオフセットが処理されたか。
- 中間状態 : ステートフル操作 (集計、
mapGroupsWithState
など) のためにマイクロバッチ間で維持する必要があるデータ。 - メタデータ : ストリーミング クエリの実行に関する情報。
チェックポイントは、ストリーミング アプリケーションのフォールト トレランスとデータの一貫性を確保するために不可欠です。
- フォールト トレランス : ストリーミング アプリケーションに障害が発生した場合 (たとえば、ノードの障害、アプリケーションのクラッシュが原因)、チェックポイントを使用すると、すべてのデータを最初から再処理するのではなく、最後に成功したチェックポイント状態からアプリケーションを再起動できます。これにより、データ損失が防止され、増分処理が保証されます。
- 正確に一度の処理 : 多くのストリーミング ソースでは、チェックポイントをべき等シンクと組み合わせて有効にし、正確に一度の処理を有効にすると、障害が発生した場合でも各レコードが正確に 1 回処理され、重複や脱落が防止されます。
- 状態管理 : ステートフル変換の場合、チェックポイントはこれらの操作の内部状態を保持し、ストリーミング クエリが累積された履歴状態に基づいて新しいデータの処理を正しく続行できるようにします。
LakeFlow Declarative パイプラインのチェックポイント
LakeFlow 宣言型パイプラインは構造化ストリーミングに基づいて構築されており、基礎となるチェックポイント管理の多くを抽象化し、より宣言的なアプローチを提供します。 パイプラインでストリーミングテーブルを定義すると、ストリーミングテーブルに書き込むフローごとにチェックポイント状態があります。これらのチェックポイントの場所はパイプラインの内部にあり、ユーザーはアクセスできません。
通常、次の場合を除き、ストリーミングテーブルの基になるチェックポイントを管理または理解する必要はありません。
- 巻き戻しと再生 : テーブルの現在の状態を保持しながら、特定の時点のデータを再処理する場合は、ストリーミングテーブルのチェックポイントをリセットする必要があります。
- チェックポイントの障害または破損からの回復 : ストリーミングテーブルへの書き込みクエリがチェックポイント関連のエラーが原因で失敗した場合、ハード エラーが発生し、クエリをそれ以上進行できなくなります。 このクラスの障害から回復するために使用できる方法は 3 つあります。
- テーブルの完全更新 : これにより、テーブルがリセットされ、既存のデータが消去されます。
- バックアップとバックフィルによるフルテーブル更新 : フルテーブル更新を実行する前にテーブルのバックアップを作成し、古いデータをバックフィルしますが、これは非常にコストがかかるため、最後の手段にする必要があります。
- リセット checkpoint and continue incrementally : 既存のデータを失うわけにはいかない場合は、影響を受けるストリーミングフローに対して選択的なチェックポイントリセットを実行する必要があります。
例: コード変更によるパイプラインの障害
Amazon S3などのクラウド・ストレージ・システムからのチェンジデータフィードと初期テーブルスナップショットを処理し、SCD-1 ストリーミング・テーブルに書き込むLakeFlow宣言型パイプラインがあるシナリオを考えてみましょう。
パイプラインには、次の 2 つのストリーミング フローがあります。
customers_incremental_flow
: ソーステーブル CDC フィードcustomer
を増分的に読み取り、重複するレコードを除外して、ターゲットテーブルに更新挿入します。customers_snapshot_flow
:customers
ソース表の初期スナップショットを 1 回だけ読み取り、レコードをターゲット表に更新挿入します。
@dlt.view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)
@dlt.view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)
dlt.create_streaming_table("customers")
dlt.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dlt.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)
このパイプラインをデプロイした後、正常に実行され、チェンジデータフィードと初期スナップショットの処理が開始されます。
後で、 customers_incremental_view
クエリの重複排除ロジックが冗長であり、パフォーマンスのボトルネックを引き起こしていることに気付きます。パフォーマンスを向上させるために dropDuplicates()
を削除します。
@dlt.view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)
dropDuplicates()
API を削除してパイプラインを再デプロイすると、更新は次のエラーで失敗します。
Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
このエラーは、チェックポイントの状態と現在のクエリ定義が一致していないために変更が許可されず、パイプラインがそれ以上進行できないことを示します。
チェックポイント関連の障害は、 dropDuplicates
API を削除するだけでなく、さまざまな理由で発生する可能性があります。一般的なシナリオは次のとおりです。
- 既存のストリーミング クエリでステートフル演算子を追加または削除する (たとえば、
dropDuplicates()
や集計の導入または削除)。 - 以前にチェックポイントされたクエリでストリーミング ソースを追加、削除、または結合する (たとえば、既存のストリーミング クエリを新しいストリーミング クエリとユニオンしたり、既存のユニオン操作からソースを追加または削除したりします)。
- ステートフル ストリーミング操作の状態スキーマの変更 (重複排除または集計に使用される列の変更など)。
サポートされている変更とサポートされていない変更の包括的なリストについては、 Spark 構造化ストリーミング ガイド および 構造化ストリーミング クエリの変更の種類を参照してください。
回復オプション
データの持続性要件とリソースの制約に応じて、次の 3 つの回復戦略があります。
メソッド | 複雑さ | コスト | データ損失の可能性 | データ重複の可能性 | 初期スナップショットが必要 | テーブルのフルリセット |
---|---|---|---|---|---|---|
低 | M | はい (初期スナップショットが使用できない場合、またはソースで raw ファイルが削除されている場合)。 | いいえ (変更の適用ターゲット テーブルの場合。 | Yes | Yes | |
M | 高 | No | いいえ (べき等シンクの場合。たとえば、auto CDC などです。 | No | No | |
中-高 (不変のオフセットを提供する追加のみのソースの場合は中)。 | 低 | いいえ (慎重な検討が必要です。 | いいえ (べき等ライターの場合。たとえば、ターゲット テーブルのみに CDC を自動送信します。 | No | No |
中から高の複雑さは、ストリーミング ソースの種類とクエリの複雑さによって異なります。
推奨 事項
- チェックポイントリセットの複雑さに対処したくない場合は、テーブル全体の更新を使用し、テーブル全体を再計算できます。これにより、コードを変更するオプションも提供されます。
- チェックポイントリセットの複雑さに対処したくなく、バックアップとバックフィルの追加コストを許容する場合は、バックアップとバックフィルでフルテーブル更新を使用します ヒストリカルデータ。
- テーブル内の既存のデータを保持し、新しいデータの増分処理を続行する必要がある場合は、テーブルのリセットチェックポイントを使用します。ただし、このアプローチでは、チェックポイントのリセットを慎重に処理して、テーブル内の既存のデータが失われていないこと、およびパイプラインが新しいデータの処理を続行できることを確認する必要があります。
チェックポイントをリセットして増分で続行する
チェックポイントをリセットして増分処理を続行するには、次の手順に従います。
-
パイプラインを停止する: パイプラインにアクティブな更新が実行されていないことを確認します。
-
新しいチェックポイントの開始位置を決定する: 処理を続行する最後に成功したオフセットまたはタイムスタンプを特定します。これは通常、障害が発生する前に正常に処理された最新のオフセットです。
上記の例では、オートローダーを使用して JSON ファイルを読み取っているため、
modifiedAfter
オプションを使用して新しいチェックポイントの開始位置を指定できます。このオプションを使用すると、オートローダが新しいファイルの処理を開始するタイミングのタイムスタンプを設定できます。Kafka ソースの場合、
startingOffsets
オプションを使用して、ストリーミングクエリーが新しいデータの処理を開始するオフセットを指定できます。Delta Lake ソースの場合、
startingVersion
オプションを使用して、ストリーミング クエリが新しいデータの処理を開始するバージョンを指定できます。 -
コードの変更を行う: ストリーミング クエリを変更して、
dropDuplicates()
API を削除したり、その他の変更を加えたりできます。また、オートローダの読み取りパスにmodifiedAfter
オプションを追加したことを確認します。Python@dlt.view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("modifiedAfter", "2025-04-09T06:15:00")
.load(customers_incremental_path)
# .dropDuplicates(["customer_id"])
)
modifiedAfter
タイムスタンプが正しくないと、データの損失や重複が発生する可能性があります。古いデータを再度処理したり、新しいデータが欠落したりしないように、タイムスタンプが正しく設定されていることを確認してください。
クエリにストリーム ストリーム結合またはストリーム ストリーム共用体がある場合は、参加しているすべてのストリーミング ソースに上記の戦略を適用する必要があります。例えば:
cdc_1 = spark.readStream.format("cloudFiles")...
cdc_2 = spark.readStream.format("cloudFiles")...
cdc_source = cdc_1..union(cdc_2)
-
チェックポイントをリセットするストリーミングテーブルに関連付けられているフロー名を特定します。 この例では、
customers_incremental_flow
です。フロー名は、パイプライン コードで確認するか、パイプライン UI またはパイプライン イベント ログを確認することで確認できます。 -
チェックポイントをリセットする: Python ノートブックを作成し、Databricks クラスターにアタッチします。
チェックポイントをリセットするには、次の情報が必要です。
- Databricks ワークスペースの URL
- パイプラインID
- チェックポイントをリセットするフロー名
Pythonimport requests
import json
# Define your Databricks instance and pipeline ID
databricks_instance = "<DATABRICKS_URL>"
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
pipeline_id = "<YOUR_PIPELINE_ID>"
flows_to_reset = ["<YOUR_FLOW_NAME>"]
# Set up the API endpoint
endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
# Set up the request headers
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Define the payload
payload = {
"reset_checkpoint_selection": flows_to_reset
}
# Make the POST request
response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
# Check the response
if response.status_code == 200:
print("Pipeline update started successfully.")
else:
print(f"Error: {response.status_code}, {response.text}") -
パイプラインの実行: パイプラインは、指定された開始位置から新しいチェックポイントを使用して新しいデータの処理を開始し、増分処理を続行しながら既存のテーブル データを保持します。
ベストプラクティス
- 本番運用でプライベート プレビュー機能を使用しないでください。
- 本番運用環境で変更を加える前に、変更をテストしてください。
- テスト パイプラインを作成します (理想的には、下位の環境で)。これが不可能な場合は、テストに別のカタログとスキーマを使用してみてください。
- エラーを再現します。
- 変更を適用します。
- 結果を検証し、go/no-go の決定を下します。
- 本番運用 パイプラインへの変更をロールアウトします。