環境バージョンの互換性
ベータ版
SDPの環境バージョンはベータ版です。
環境バージョンが設定されたパイプラインは、 Spark Connectを介して Python コードを実行します。このページでは、互換性のない部分、動作が異なる部分、影響を受けるパターンをパイプラインでスキャンする方法、および既存のパイプラインを移行する方法について説明します。
制限事項
環境バージョンは、パイプラインのすべての機能とまだ互換性があるわけではありません。環境バージョンを設定したパイプラインの実行は、パイプラインのPythonコードが以下のいずれかに該当する場合に失敗します。
- パイプラインデコレータで装飾された関数内で、Sparkセッションの状態を変更します。例としては、
spark.conf.set(...)、spark.sql("USE CATALOG ...")、createOrReplaceTempViewなどがあります。 - Spark Connect では利用できないPySpark APIs使用します。これには
SparkContext、RDD、SQLContext、および Py4J APIs含まれます。 Spark Connectでサポートされている機能については、こちらをご覧ください。
パイプライン上で環境バージョンを有効にした結果、パイプラインが失敗した場合、その環境バージョンを無効にすると、パイプラインは以前の状態に戻ります。
行動の変化
Spark Connect には、従来のPySparkランタイムとの動作の違いがいくつかあります。 詳細については、 「Spark Connectと従来のSparkの比較」を参照してください。互換性スキャンはこれらのパターンを事前に検出し、対処されるまで有効化をブロックするため、本番運用データに影響が出る前にそれらを見つけて修正できます。
パイプラインにおいて、動作が異なる可能性のある最も一般的な状況は次のとおりです。
インターリーブされたDataFrame構築とセッションの変更
パイプラインがDataFrame構築すると、 Sparkセッション状態が変更され (たとえば、暫定カタログまたはスキーマの変更、構成の設定、一時ビューの置き換え、またはUDF再登録)、 DataFrameが使用されます。
- 環境バージョンが指定されていない場合、DataFrameは 変更前の セッション状態を使用します。
- 環境バージョンを使用すると、DataFrame は 変更後の セッション状態を使用します。
例えば:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
環境バージョンがない場合、 mytableには[(1, "Original Row")]が含まれます。環境バージョンでは、 mytableには[(2, "Replaced Row")]が含まれます。
可変なPython状態を参照するUDF
UDFが、UDFの定義後に値が変化するPythonグローバル変数を参照する場合:
- 環境バージョンが指定されていない場合、UDFは変数の 最新の 値を使用します。
- 環境バージョンを使用すると、UDF は UDF が定義された時点の値 を使用します。
例えば:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
環境バージョンがない場合、 my_mvには[("alex_b",)]が含まれます。環境バージョンでは、 my_mvには[("alex_a",)]が含まれます。
パイプラインがどちらかのパターンに依存している場合は、環境バージョンを有効にする前に監査を実施してください。
互換性スキャン
互換性スキャンは、環境バージョンを有効にする前に、パイプライン内のコードパターンの中から、環境バージョンによって異なる結果が生じる可能性のあるパターンを見つけるのに役立ちます。スキャンはオプトイン方式です。パイプラインでスキャンが有効になっている場合:
- 各パイプライン実行では、検出されたパターンごとにパイプラインイベントログに1つの
BehaviorChangeInSparkConnectWARNイベントが出力されます。 - 前回の正常なアップデートで発生した互換性に関する警告をすべて解決するまで、パイプライン上で環境バージョンを有効にすることはできません。
スキャンが有効になっていない場合、イベントは発生せず、 environment_version有効化もブロックされません。Databricksは、パイプライン上で環境バージョンを有効にする前に、スキャンを有効にして検出されたパターンを解決することを推奨します。
パイプラインでスキャンを有効にする
互換性スキャンを有効にするには、 pipelines.environmentVersion.enableCompatibilityScanパイプライン構成を追加します。構成は、パイプラインエディタの UI を介して、またはパイプライン構成 JSON にエントリを追加することによって追加できます。
UIを通して :
- パイプラインエディタから、 [設定] をクリックします。
- パイプライン設定の 「構成」 セクションを探してください。
- クリック
設定を追加します 。
- キーとして
pipelines.environmentVersion.enableCompatibilityScan、値としてtrueを入力してください。 - パイプライン設定を保存します。
パイプライン内のJSON :
configurationブロックに以下のエントリを追加してください。
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
推奨されるワークフロー
- パイプラインのスキャンを有効にします。
- パイプラインの実行をトリガーします。
- パイプラインイベントログから
BehaviorChangeInSparkConnectWARNイベントを照会します。互換性イベントのリファレンスを参照して、問題コード、例となるパターン、および推奨される修正方法の完全なリストを確認してください。 - 検出されたパターンを削除するようにパイプラインコードを更新し、イベントが発生しなくなるまでパイプラインを再度実行してください。
- パイプラインで環境バージョンを有効にする方法のいずれかを使用して、パイプラインに
environment_versionを追加します。
互換性警告が誤検出であると判断し、それでもenvironment_versionを有効にしたい場合は、パイプライン構成からpipelines.environmentVersion.enableCompatibilityScanエントリを削除してチェックをバイパスしてください。(値をfalseに設定することはできません。エントリを完全に削除する必要があります。)
事前チェックは、過去に更新されていないパイプライン、または既に環境バージョンが設定されているパイプラインでは実行されません。
既存のパイプラインを環境バージョンに移行する
環境バージョンをまだ使用していない既存のパイプラインを移行するには、このエンドツーエンドのワークフローに従ってください。この手順では、Spark Connect環境で異なる動作をする可能性のあるコードパターンを見つけ出し、それらを修正し、環境バージョンを安全に展開する方法について説明します。
-
パイプラインで互換性スキャンを有効にします。 互換性スキャンの説明に従って、パイプラインでスキャンを有効にします。これにより、検出されたパターンがイベントログに表示され、有効化の試みを保護する事前チェックが実行されます。
-
パイプラインの実行をトリガーし、互換性イベントを確認します。 通常のパイプライン更新をトリガーします。処理が正常に完了したら、パイプラインイベントログから
BehaviorChangeInSparkConnectWARNイベントを照会します。各イベントは検出されたパターンを1つ報告する。互換性イベントのリファレンスを参照して、問題コード、例となるパターン、および推奨される修正方法の完全なリストを確認してください。 -
検出されたパターンに対応するため、パイプラインコードを更新してください。 検出されたパターンごとに、推奨される修正方法に従ってパイプラインコードを更新してください。変更を加えるたびに、パイプラインの更新を再度実行し、対応するイベントが表示されなくなったことを確認してください。イベントログにアップデート成功を示す互換性イベントが表示されなくなるまで、この手順を繰り返します。
-
パイプライン上で環境バージョンを有効にしてください。 最新の更新が成功し、互換性イベントが発生しなかった場合は、 「パイプラインで環境バージョンを有効にする」で説明されているように、UI、API、または
environment_versionを使用してパイプラインにを追加します。次回の更新は、 Spark Connect とピン留めPython言語バージョンおよびプリインストールされたライブラリを使用して実行されます。互換性に関する警告が残っているためにアップデートが失敗した場合は、
environment_versionを削除し、 2 に戻り、残りの警告を解決してから再度試してください。 -
移行を検証する。 環境バージョンによる最初のアップデートが完了したら、以下を確認してください。
- イベントログの
create_updateイベントは、environment_versionが期待値に設定されていることを示しています。 - パイプラインは期待通りのデータを生成し、新たなエラーイベントは発生しない。
- 下流のテーブルを抜き打ちでチェックして、 「動作の変化」で説明されているような微妙な動作の違いがないか確認してください。
- イベントログの
ロールバック
移行後にパイプラインが正常に動作しない場合は、パイプライン設定からenvironment_versionを削除してください。次回の更新は、以前のPythonランタイム構成で実行されます。 ロールバックした実行を使用してデバッグし、問題を特定して修正した後、ステップ 2 から移行を繰り返します。
互換性イベントのリファレンス
パイプラインで互換性スキャンが有効になっている場合、SDP は検出されたパターンごとにパイプラインイベントログに 1 つのBehaviorChangeInSparkConnect WARNイベントを出力します。スキャンが有効になっており、前回の正常な更新でパターンが検出された場合、SDP はパターンが対処されるまでenvironment_version有効化もブロックします。
各イベントは、検出された内容を識別する単一の問題コードを報告します。コードを調べるには、 「問題コード」表で該当のコードを探してください。各行は、例となるパターンと推奨される修正方法が記載されているカテゴリセクションへのリンクになっています。
イベントの形状
BehaviorChangeInSparkConnect イベントは標準のパイプラインイベントログスキーマに従います。
event_typebehavior_change_in_spark_connectです。levelWARNです。detailsbehavior_change_in_spark_connectオブジェクトが含まれており、そのオブジェクトには単一のissueフィールドがあります。問題値は、以下のコードのいずれかです。message検出されたパターンを人間が読みやすい形で記述したものです。
発行コード
カテゴリー | 問題コード | 説明 |
|---|---|---|
| DataFrameの作成後にカタログが変更されました。 既存のDataFrameは、新しいデフォルトカタログを使用してテーブルを解決する可能性があります。 | |
|
| |
| DataFrameが作成された後、デフォルトのデータベースが変更されました。既存のDataFrameは、新しいデフォルトデータベースを使用してテーブルを解決する可能性があります。 | |
|
| |
| フロー関数はチェックポイントコマンドを呼び出します。 | |
| フロー関数は、DataFrameビュー( | |
| フロー機能はリソースプロファイルを作成します。 | |
| フロー関数は、 | |
| フロー関数は、対象テーブルに対してEager | |
| フロー関数は、Spark MLの即時実行操作を実行します。 | |
| フロー関数はPythonデータソースを登録します。 | |
| フロー機能は、アクティブなストリーミングクエリハンドル上で動作します。 | |
| フロー関数は、ストリーミング クエリ リスナーを登録または削除します。 | |
| フロー関数は、ストリーミングクエリを管理するために | |
| フロー関数は、即時 | |
| フロー関数は、即時 | |
| フロー関数はストリーミングクエリ( | |
|
| |
|
| |
|
| |
| グローバルな一時ビューは、それを参照するDataFrameが作成された後、置き換えられました。 置換は既存のDataFrameに反映される可能性があります。 | |
| 一時ビューは、それを参照するDataFrameが作成された後、置き換えられました。 置換は既存のDataFrameに反映される可能性があります。 | |
| 参照するDataFrameが作成された後、 UDF同じ名前で再登録されました。 既存のDataFrameは、新しいUDF定義を使用できる可能性があります。 | |
| UDTFを参照するDataFrameが作成された後、同じ名前でUDTFが再登録されました。既存のDataFrameは、新しいUDTF定義を使用できる。 | |
| UDFは、グローバルな可変Python変数を参照します。環境バージョンを使用する場合、UDFは呼び出し時ではなく、UDFが定義された時点の変数の値を使用します。 | |
| UDTFは、グローバルな可変Python変数を参照します。環境バージョンを使用する場合、UDTFは呼び出し時ではなく、UDTFが定義された時点での変数の値を使用します。 |
データベースとカタログの変更
これらの問題は、パイプラインコードがデフォルトのデータベースまたはカタログを変更した場合に発生します。環境バージョンを使用すると、変更前に構築されたDataFrames 、新しいデータベースまたはカタログを使用してテーブルを解決する可能性があります。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
環境バージョンがない場合、 df marketingカタログからeventsを解決します。環境バージョンを使用すると、 df salesカタログからeventsを解決します。
推奨される修正方法: テーブル名を完全に修飾して、解決がデフォルトのカタログやデータベースに依存しないようにし、DataFrame の作成と使用の間でデフォルトのカタログやデータベースを変更しないようにしてください。
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark構成の変更
これらの問題は、パイプラインコードがSpark構成を変更し、環境バージョンによってDataFrameの動作が変わる可能性がある場合に発生します。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
環境バージョンが指定されていない場合、キャストはDataFrame作成時の設定値を使用します。 環境バージョンでは、キャストはspark.sql.ansi.enabled=trueを使用し、無効な入力に対して失敗する可能性があります。
推奨される修正方法: パイプラインファイルの先頭、つまりDataFrameが作成される前に、必要なSpark設定をすべて設定してください。クエリごとの設定には、パイプライン仕様のパイプラインのconfiguration設定を使用します。
一時的な表示代替
これらの問題は、一時ビューを参照するDataFrameが作成された後に、パイプライン コードが一時ビューを置き換えるときに発生します。 環境バージョンによっては、既存のDataFrameに新しいビューの内容が反映される場合があります。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
環境バージョンがない場合、 mytableには[(1, "Original Row")]が含まれます。環境バージョンでは、 mytableには[(2, "Replaced Row")]が含まれます。
推奨される解決策: 各一時ビューを一度だけ作成し、置き換えないようにしてください。関連データを含む複数のビューが必要な場合は、それぞれに異なる名前を付けてください。
UDFおよびUDTF変異
これらの問題は、パイプラインコードがUDFまたはUDTFを変更し、環境バージョンによって動作が変わる場合に発生します。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
環境バージョンがない場合、 my_mvには[("alex_b",)]が含まれます。環境バージョンでは、 my_mvには[("alex_a",)]が含まれます。
修正案: Pythonのグローバル変数から値を取得するのではなく、UDFに引数として値を渡すか、UDFを定義する前にグローバル変数を設定し、その後変更しないようにしてください。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
フロー関数内での積極的な実行
これらの問題は、パイプライン コードがパイプライン デコレーター ( @table 、 @materialized_viewなど) によって装飾された関数内で Eager Sparkコマンドを実行するときに発生します。 フロー関数はDataFrameを定義して返すことが期待されています。環境バージョンが設定されているフロー関数内では、データの書き込み、ストリーミング クエリの管理、登録実行するリソース、またはML操作を行う Eerger コマンドは許可されません。
修正案: Eager操作をフロー関数の外に移動し、フロー関数からは代わりにDataFrameを返すようにします。テーブルへの書き込みやストリーミング クエリの開始などの副作用は、パイプライン定義の範囲外に属します。パイプライン エンジンは、フロー関数によって返されたDataFrameの具体化を処理します。
イベントログで互換性イベントを検索します
次のクエリは、パイプラインのすべての互換性イベントを、最新のものから順に返します。
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
最近のアップデートにおける問題コード別のイベント数をカウントするには:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
イベントログのクエリ方法については、 「イベントログのクエリ」を参照してください。
関連項目
- パイプラインの環境バージョンを設定する— 機能の概要、環境バージョンの有効化方法。
- パイプライン イベント ログ スキーマ— 完全なパイプライン イベント ログ スキーマ。
- パイプライン イベント ログ — パイプラインイベント ログをクエリする方法。