Skip to main content

Environment version compatibility

Beta

Environment versions for SDP are in Beta.

Pipelines with an environment version set run Python code through Spark Connect. This page covers what is incompatible, what behaves differently, how to scan a pipeline for affected patterns, and how to migrate an existing pipeline.

Limitations

Environment versions are not yet compatible with all pipeline functionality. A pipeline run with an environment version set fails if the pipeline's Python code does any of the following:

  • Mutates Spark session state inside a function decorated with a pipelines decorator. Examples include spark.conf.set(...), spark.sql("USE CATALOG ..."), and createOrReplaceTempView.
  • Uses PySpark APIs that are unavailable in Spark Connect, including SparkContext, RDD, SQLContext, and any Py4J APIs. See What is supported in Spark Connect.

If enabling an environment version on a pipeline causes it to fail, disabling the environment version returns the pipeline to its previous state.

Behavior changes

Spark Connect has a small number of behavior differences from the classic PySpark runtime. See Spark Connect vs. classic Spark for the full reference. The Compatibility scan detects these patterns ahead of time and blocks enablement until they are addressed, so you can find and fix them before they affect production data.

In a pipeline, the most common situations where behavior may differ are:

Interleaved DataFrame construction and session mutation

When a pipeline constructs a DataFrame, then mutates Spark session state (for example, changes the default catalog or schema, sets a config, replaces a temp view, or re-registers a UDF), then uses the DataFrame:

  • Without an environment version, the DataFrame uses the pre-mutation session state.
  • With an environment version, the DataFrame uses the post-mutation session state.

For example:

Python
from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
return df

Without an environment version, mytable contains [(1, "Original Row")]. With an environment version, mytable contains [(2, "Replaced Row")].

UDFs that reference mutable Python state

When a UDF references a Python global variable whose value changes after the UDF is defined:

  • Without an environment version, the UDF uses the latest value of the variable.
  • With an environment version, the UDF uses the value at the time the UDF was defined.

For example:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Without an environment version, my_mv contains [("alex_b",)]. With an environment version, my_mv contains [("alex_a",)].

If a pipeline relies on either pattern, audit it before enabling an environment version.

Compatibility scan

The compatibility scan helps you find code patterns in your pipeline that would produce different results under an environment version, before you enable one. The scan is opt-in. When the scan is enabled on a pipeline:

  • Each pipeline run emits one BehaviorChangeInSparkConnect WARN event in the pipeline event log per detected pattern.
  • You cannot enable an environment version on the pipeline until you address all compatibility warnings from the previous successful update.

If the scan is not enabled, no events are emitted and environment_version enablement is not blocked. Databricks recommends enabling the scan and resolving any detected patterns before enabling an environment version on the pipeline.

Enable the scan on a pipeline

You can enable the compatibility scan by adding the pipelines.environmentVersion.enableCompatibilityScan pipeline configuration You can add configuration through the pipeline editor UI or by adding an entry to the pipeline configuration JSON.

Through the UI:

  1. From the pipeline editor, click Settings.
  2. Find the Configuration section in pipeline settings.
  3. Click Plus icon. Add configuration.
  4. Enter pipelines.environmentVersion.enableCompatibilityScan as the key and true as the value.
  5. Save the pipeline settings.

In the pipeline JSON:

Add the following entry to the configuration block:

JSON
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. Enable the scan on the pipeline.
  2. Trigger a pipeline run.
  3. Query the pipeline event log for BehaviorChangeInSparkConnect WARN events. See Compatibility events reference for the full list of issue codes, example patterns, and suggested fixes.
  4. Update the pipeline code to remove the detected patterns and run the pipeline again until no more events are emitted.
  5. Add environment_version to the pipeline using one of the methods in Enable an environment version on a pipeline.

If you believe a compatibility warning is a false positive and want to enable environment_version anyway, remove the pipelines.environmentVersion.enableCompatibilityScan entry from the pipeline configuration to bypass the check. (Setting the value to false is not allowed — you must remove the entry entirely.)

The preflight check does not run on pipelines that have no previous update, or on pipelines that already have an environment version set.

Migrate an existing pipeline to environment versions

To migrate an existing pipeline that does not yet use an environment version, follow this end-to-end workflow. It walks you through finding code patterns that may behave differently under Spark Connect, fixing them, and rolling out the environment version safely.

  1. Enable the compatibility scan on the pipeline. Enable the scan on the pipeline as described in Compatibility scan. This is what causes detected patterns to surface in the event log and what enables the preflight check that guards your enablement attempt.

  2. Trigger a pipeline run and review compatibility events. Trigger a normal pipeline update. After it completes successfully, query the pipeline event log for BehaviorChangeInSparkConnect WARN events. Each event reports one detected pattern. See Compatibility events reference for the full list of issue codes, example patterns, and suggested fixes.

  3. Update your pipeline code to address detected patterns. For each detected pattern, update your pipeline code following the suggested fix. After each change, trigger another pipeline update and verify the corresponding events no longer appear. Repeat until the event log no longer surfaces any compatibility events for a successful update.

  4. Enable the environment version on the pipeline. After the most recent successful update has no compatibility events, add environment_version to the pipeline using the UI, API, or bundle as described in Enable an environment version on a pipeline. The next update runs with Spark Connect and the pinned Python language version and preinstalled libraries.

    If the update fails because compatibility warnings still exist, drop the environment_version, return to step 2, and resolve the remaining warnings before trying again.

  5. Verify the migration. After the first update with the environment version completes, verify:

    • The create_update event in the event log shows environment_version set to the expected value.
    • The pipeline produces the expected data and no new error events appear.
    • Spot-check downstream tables for any subtle behavior differences described in Behavior changes.

Rollback

If the pipeline misbehaves after migration, remove the environment_version from the pipeline settings. The next update runs with the previous Python runtime configuration. Use the rolled-back run to debug, then repeat the migration from step 2 after you've identified and fixed the issue.

Compatibility events reference

When the compatibility scan is enabled on a pipeline, SDP emits one BehaviorChangeInSparkConnect WARN event in the pipeline event log per detected pattern. When the scan is enabled and the previous successful update detected any patterns, SDP also blocks environment_version enablement until the patterns are addressed.

Each event reports a single issue code that identifies what was detected. To look up a code, find it in the Issue codes table — each row links to the category section that contains an example pattern and the suggested fix.

Event shape

BehaviorChangeInSparkConnect events follow the standard pipeline event log schema:

  • event_type is behavior_change_in_spark_connect.
  • level is WARN.
  • details contains the behavior_change_in_spark_connect object, which has a single issue field. The issue value is one of the codes listed below.
  • message is a human-readable description of the detected pattern.

Issue codes

Category

Issue code

Description

Database and catalog mutations

USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

The default catalog was changed after a DataFrame was created. The existing DataFrame may resolve tables using the new default catalog.

Database and catalog mutations

USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR

USE CATALOG was called outside a function decorated by a pipelines decorator. The default catalog may change unexpectedly for subsequent operations.

Database and catalog mutations

USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

The default database was changed after a DataFrame was created. The existing DataFrame may resolve tables using the new default database.

Database and catalog mutations

USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR

USE DATABASE was called outside a function decorated by a pipelines decorator. The default database may change unexpectedly for subsequent operations.

Eager execution within flow functions

CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function calls a checkpoint command.

Eager execution within flow functions

CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function eagerly creates a DataFrame view (createOrReplaceTempView or similar).

Eager execution within flow functions

CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function creates a resource profile.

Eager execution within flow functions

GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function calls spark.resources or a related resource API.

Eager execution within flow functions

MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function performs an eager MERGE INTO on a target table.

Eager execution within flow functions

ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function performs an eager Spark ML operation.

Eager execution within flow functions

REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function registers a Python data source.

Eager execution within flow functions

STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function operates on an active streaming query handle.

Eager execution within flow functions

STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function registers or removes a streaming query listener.

Eager execution within flow functions

STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function calls spark.streams to manage streaming queries.

Eager execution within flow functions

WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function performs an eager DataFrameWriterV2 operation.

Eager execution within flow functions

WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function performs an eager DataFrame.write operation.

Eager execution within flow functions

WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED

The flow function starts a streaming query (writeStream.start()).

Spark configuration mutations

CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED

spark.conf.set() or spark.conf.unset() was called inside a function decorated by a pipelines decorator. This is not supported with an environment version.

Spark configuration mutations

SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

spark.conf.set() was called outside a function decorated by a pipelines decorator after a DataFrame was created. The config change may affect the existing DataFrame at execution time.

Spark configuration mutations

UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

spark.conf.unset() was called outside a function decorated by a pipelines decorator after a DataFrame was created. The config change may affect the existing DataFrame at execution time.

Temporary view replacements

REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

A global temporary view was replaced after a DataFrame referencing it was created. The replacement may be reflected in the existing DataFrame.

Temporary view replacements

REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

A temporary view was replaced after a DataFrame referencing it was created. The replacement may be reflected in the existing DataFrame.

UDF and UDTF mutations

OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

A UDF was re-registered with the same name after a DataFrame referencing it was created. The existing DataFrame may use the new UDF definition.

UDF and UDTF mutations

OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR

A UDTF was re-registered with the same name after a DataFrame referencing it was created. The existing DataFrame may use the new UDTF definition.

UDF and UDTF mutations

UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR

A UDF references a global mutable Python variable. With an environment version, the UDF uses the value of the variable at the time the UDF was defined, not at invocation time.

UDF and UDTF mutations

UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR

A UDTF references a global mutable Python variable. With an environment version, the UDTF uses the value of the variable at the time the UDTF was defined, not at invocation time.

Database and catalog mutations

These issues are emitted when pipeline code mutates the default database or catalog. With an environment version, DataFrames constructed before the mutation may resolve tables using the new database or catalog.

Example pattern that triggers an event:

Python
from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales") # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
return df.groupBy("region").count()

Without an environment version, df resolves events from the marketing catalog. With an environment version, df resolves events from the sales catalog.

Suggested fix: Fully qualify table names so resolution does not depend on the default catalog or database, and avoid changing the default catalog or database between DataFrame creation and use.

Python
from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
return df.groupBy("region").count()

Spark configuration mutations

These issues are emitted when pipeline code mutates Spark configuration in ways that can change DataFrame behavior under an environment version.

Example pattern that triggers an event:

Python
from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created

@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")

Without an environment version, the cast uses the conf value at DataFrame creation time. With an environment version, the cast uses spark.sql.ansi.enabled=true and may fail on invalid input.

Suggested fix: Set all required Spark configurations at the top of the pipeline file, before any DataFrame is created. For per-query configuration, use the pipeline's configuration setting in the pipeline spec.

Temporary view replacements

These issues are emitted when pipeline code replaces a temporary view after a DataFrame referencing it was created. With an environment version, the existing DataFrame may reflect the new view contents.

Example pattern that triggers an event:

Python
from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
return df

Without an environment version, mytable contains [(1, "Original Row")]. With an environment version, mytable contains [(2, "Replaced Row")].

Suggested fix: Create each temporary view a single time and do not replace it. If you need multiple views with related data, give each a distinct name.

UDF and UDTF mutations

These issues are emitted when pipeline code mutates a UDF or UDTF in ways that change behavior under an environment version.

Example pattern that triggers an event:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Without an environment version, my_mv contains [("alex_b",)]. With an environment version, my_mv contains [("alex_a",)].

Suggested fix: Pass values into the UDF as arguments instead of capturing them from Python globals, or set the global before defining the UDF and do not mutate it afterward.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
return s + suffix

@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

Eager execution within flow functions

These issues are emitted when pipeline code performs an eager Spark command inside a function decorated by a pipelines decorator (@table, @materialized_view, etc.). Flow functions are expected to define and return a DataFrame; eager commands that write data, manage streaming queries, register resources, or run ML operations are not allowed inside a flow function with an environment version set.

Suggested fix: Move the eager operation outside the flow function and return a DataFrame from the flow function instead. Side-effects such as writing to a table or starting a streaming query belong outside the pipeline definition; the pipeline engine handles materialization of the DataFrame returned by the flow function.

Find compatibility events in the event log

The following query returns all compatibility events for a pipeline, ordered most recent first:

SQL
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;

To count events by issue code across recent updates:

SQL
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

For how to query the event log, see Query the event log.

See also