Environment version compatibility
Environment versions for SDP are in Beta.
Pipelines with an environment version set run Python code through Spark Connect. This page covers what is incompatible, what behaves differently, how to scan a pipeline for affected patterns, and how to migrate an existing pipeline.
Limitations
Environment versions are not yet compatible with all pipeline functionality. A pipeline run with an environment version set fails if the pipeline's Python code does any of the following:
- Mutates Spark session state inside a function decorated with a pipelines decorator. Examples include
spark.conf.set(...),spark.sql("USE CATALOG ..."), andcreateOrReplaceTempView. - Uses PySpark APIs that are unavailable in Spark Connect, including
SparkContext,RDD,SQLContext, and any Py4J APIs. See What is supported in Spark Connect.
If enabling an environment version on a pipeline causes it to fail, disabling the environment version returns the pipeline to its previous state.
Behavior changes
Spark Connect has a small number of behavior differences from the classic PySpark runtime. See Spark Connect vs. classic Spark for the full reference. The Compatibility scan detects these patterns ahead of time and blocks enablement until they are addressed, so you can find and fix them before they affect production data.
In a pipeline, the most common situations where behavior may differ are:
Interleaved DataFrame construction and session mutation
When a pipeline constructs a DataFrame, then mutates Spark session state (for example, changes the default catalog or schema, sets a config, replaces a temp view, or re-registers a UDF), then uses the DataFrame:
- Without an environment version, the DataFrame uses the pre-mutation session state.
- With an environment version, the DataFrame uses the post-mutation session state.
For example:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Without an environment version, mytable contains [(1, "Original Row")]. With an environment version, mytable contains [(2, "Replaced Row")].
UDFs that reference mutable Python state
When a UDF references a Python global variable whose value changes after the UDF is defined:
- Without an environment version, the UDF uses the latest value of the variable.
- With an environment version, the UDF uses the value at the time the UDF was defined.
For example:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Without an environment version, my_mv contains [("alex_b",)]. With an environment version, my_mv contains [("alex_a",)].
If a pipeline relies on either pattern, audit it before enabling an environment version.
Compatibility scan
The compatibility scan helps you find code patterns in your pipeline that would produce different results under an environment version, before you enable one. The scan is opt-in. When the scan is enabled on a pipeline:
- Each pipeline run emits one
BehaviorChangeInSparkConnectWARNevent in the pipeline event log per detected pattern. - You cannot enable an environment version on the pipeline until you address all compatibility warnings from the previous successful update.
If the scan is not enabled, no events are emitted and environment_version enablement is not blocked. Databricks recommends enabling the scan and resolving any detected patterns before enabling an environment version on the pipeline.
Enable the scan on a pipeline
You can enable the compatibility scan by adding the pipelines.environmentVersion.enableCompatibilityScan pipeline configuration You can add configuration through the pipeline editor UI or by adding an entry to the pipeline configuration JSON.
Through the UI:
- From the pipeline editor, click Settings.
- Find the Configuration section in pipeline settings.
- Click
Add configuration.
- Enter
pipelines.environmentVersion.enableCompatibilityScanas the key andtrueas the value. - Save the pipeline settings.
In the pipeline JSON:
Add the following entry to the configuration block:
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Recommended workflow
- Enable the scan on the pipeline.
- Trigger a pipeline run.
- Query the pipeline event log for
BehaviorChangeInSparkConnectWARNevents. See Compatibility events reference for the full list of issue codes, example patterns, and suggested fixes. - Update the pipeline code to remove the detected patterns and run the pipeline again until no more events are emitted.
- Add
environment_versionto the pipeline using one of the methods in Enable an environment version on a pipeline.
If you believe a compatibility warning is a false positive and want to enable environment_version anyway, remove the pipelines.environmentVersion.enableCompatibilityScan entry from the pipeline configuration to bypass the check. (Setting the value to false is not allowed — you must remove the entry entirely.)
The preflight check does not run on pipelines that have no previous update, or on pipelines that already have an environment version set.
Migrate an existing pipeline to environment versions
To migrate an existing pipeline that does not yet use an environment version, follow this end-to-end workflow. It walks you through finding code patterns that may behave differently under Spark Connect, fixing them, and rolling out the environment version safely.
-
Enable the compatibility scan on the pipeline. Enable the scan on the pipeline as described in Compatibility scan. This is what causes detected patterns to surface in the event log and what enables the preflight check that guards your enablement attempt.
-
Trigger a pipeline run and review compatibility events. Trigger a normal pipeline update. After it completes successfully, query the pipeline event log for
BehaviorChangeInSparkConnectWARNevents. Each event reports one detected pattern. See Compatibility events reference for the full list of issue codes, example patterns, and suggested fixes. -
Update your pipeline code to address detected patterns. For each detected pattern, update your pipeline code following the suggested fix. After each change, trigger another pipeline update and verify the corresponding events no longer appear. Repeat until the event log no longer surfaces any compatibility events for a successful update.
-
Enable the environment version on the pipeline. After the most recent successful update has no compatibility events, add
environment_versionto the pipeline using the UI, API, or bundle as described in Enable an environment version on a pipeline. The next update runs with Spark Connect and the pinned Python language version and preinstalled libraries.If the update fails because compatibility warnings still exist, drop the
environment_version, return to step 2, and resolve the remaining warnings before trying again. -
Verify the migration. After the first update with the environment version completes, verify:
- The
create_updateevent in the event log showsenvironment_versionset to the expected value. - The pipeline produces the expected data and no new error events appear.
- Spot-check downstream tables for any subtle behavior differences described in Behavior changes.
- The
Rollback
If the pipeline misbehaves after migration, remove the environment_version from the pipeline settings. The next update runs with the previous Python runtime configuration. Use the rolled-back run to debug, then repeat the migration from step 2 after you've identified and fixed the issue.
Compatibility events reference
When the compatibility scan is enabled on a pipeline, SDP emits one BehaviorChangeInSparkConnect WARN event in the pipeline event log per detected pattern. When the scan is enabled and the previous successful update detected any patterns, SDP also blocks environment_version enablement until the patterns are addressed.
Each event reports a single issue code that identifies what was detected. To look up a code, find it in the Issue codes table — each row links to the category section that contains an example pattern and the suggested fix.
Event shape
BehaviorChangeInSparkConnect events follow the standard pipeline event log schema:
event_typeisbehavior_change_in_spark_connect.levelisWARN.detailscontains thebehavior_change_in_spark_connectobject, which has a singleissuefield. The issue value is one of the codes listed below.messageis a human-readable description of the detected pattern.
Issue codes
Category | Issue code | Description |
|---|---|---|
| The default catalog was changed after a DataFrame was created. The existing DataFrame may resolve tables using the new default catalog. | |
|
| |
| The default database was changed after a DataFrame was created. The existing DataFrame may resolve tables using the new default database. | |
|
| |
| The flow function calls a checkpoint command. | |
| The flow function eagerly creates a DataFrame view ( | |
| The flow function creates a resource profile. | |
| The flow function calls | |
| The flow function performs an eager | |
| The flow function performs an eager Spark ML operation. | |
| The flow function registers a Python data source. | |
| The flow function operates on an active streaming query handle. | |
| The flow function registers or removes a streaming query listener. | |
| The flow function calls | |
| The flow function performs an eager | |
| The flow function performs an eager | |
| The flow function starts a streaming query ( | |
|
| |
|
| |
|
| |
| A global temporary view was replaced after a DataFrame referencing it was created. The replacement may be reflected in the existing DataFrame. | |
| A temporary view was replaced after a DataFrame referencing it was created. The replacement may be reflected in the existing DataFrame. | |
| A UDF was re-registered with the same name after a DataFrame referencing it was created. The existing DataFrame may use the new UDF definition. | |
| A UDTF was re-registered with the same name after a DataFrame referencing it was created. The existing DataFrame may use the new UDTF definition. | |
| A UDF references a global mutable Python variable. With an environment version, the UDF uses the value of the variable at the time the UDF was defined, not at invocation time. | |
| A UDTF references a global mutable Python variable. With an environment version, the UDTF uses the value of the variable at the time the UDTF was defined, not at invocation time. |
Database and catalog mutations
These issues are emitted when pipeline code mutates the default database or catalog. With an environment version, DataFrames constructed before the mutation may resolve tables using the new database or catalog.
Example pattern that triggers an event:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Without an environment version, df resolves events from the marketing catalog. With an environment version, df resolves events from the sales catalog.
Suggested fix: Fully qualify table names so resolution does not depend on the default catalog or database, and avoid changing the default catalog or database between DataFrame creation and use.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark configuration mutations
These issues are emitted when pipeline code mutates Spark configuration in ways that can change DataFrame behavior under an environment version.
Example pattern that triggers an event:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Without an environment version, the cast uses the conf value at DataFrame creation time. With an environment version, the cast uses spark.sql.ansi.enabled=true and may fail on invalid input.
Suggested fix: Set all required Spark configurations at the top of the pipeline file, before any DataFrame is created. For per-query configuration, use the pipeline's configuration setting in the pipeline spec.
Temporary view replacements
These issues are emitted when pipeline code replaces a temporary view after a DataFrame referencing it was created. With an environment version, the existing DataFrame may reflect the new view contents.
Example pattern that triggers an event:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Without an environment version, mytable contains [(1, "Original Row")]. With an environment version, mytable contains [(2, "Replaced Row")].
Suggested fix: Create each temporary view a single time and do not replace it. If you need multiple views with related data, give each a distinct name.
UDF and UDTF mutations
These issues are emitted when pipeline code mutates a UDF or UDTF in ways that change behavior under an environment version.
Example pattern that triggers an event:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Without an environment version, my_mv contains [("alex_b",)]. With an environment version, my_mv contains [("alex_a",)].
Suggested fix: Pass values into the UDF as arguments instead of capturing them from Python globals, or set the global before defining the UDF and do not mutate it afterward.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Eager execution within flow functions
These issues are emitted when pipeline code performs an eager Spark command inside a function decorated by a pipelines decorator (@table, @materialized_view, etc.). Flow functions are expected to define and return a DataFrame; eager commands that write data, manage streaming queries, register resources, or run ML operations are not allowed inside a flow function with an environment version set.
Suggested fix: Move the eager operation outside the flow function and return a DataFrame from the flow function instead. Side-effects such as writing to a table or starting a streaming query belong outside the pipeline definition; the pipeline engine handles materialization of the DataFrame returned by the flow function.
Find compatibility events in the event log
The following query returns all compatibility events for a pipeline, ordered most recent first:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
To count events by issue code across recent updates:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
For how to query the event log, see Query the event log.
See also
- Configure environment versions for pipelines — feature overview, how to enable an environment version.
- Pipeline event log schema — full pipeline event log schema.
- Pipeline event log — how to query the pipeline event log.