Skip to main content

Compare Spark Connect to Spark Classic

Spark Connect is a gRPC-based protocol within Apache Spark that specifies how a client application can communicate with a remote Spark Server. It allows remote execution of Spark workloads using the DataFrame API.

Spark Connect is used in the following:

  • Scala notebooks with Databricks Runtime version 13.3 and above, on standard and dedicated compute
  • Python notebooks with Databricks Runtime version 14.3 and above, on standard and dedicated compute
  • Serverless compute
  • Databricks Connect

While both Spark Connect and Spark Classic utilize lazy execution for transformations, there are important differences to know to avoid unexpected behavior and performance issues when migrating existing code from Spark Classic to Spark Connect or when writing code that must work with both.

Lazy vs eager

The key difference between Spark Connect and Spark Classic is that Spark Connect defers analysis and name resolution to execution time, as summarized in the following table.

Aspect

Spark Classic

Spark Connect

Query execution

Lazy

Lazy

Schema analysis

Eager

Lazy

Schema access

Local

Triggers RPC

Temporary views

Plan embedded

Name lookup

UDF serialization

At creation

At execution

Query execution

Both Spark Classic and Spark Connect follow the same lazy execution model for query execution.

In Spark Classic, DataFrame transformations (such as filter and limit) are lazy. This means they are not executed immediately, but are recorded in a logical plan. The actual computation is triggered only when an action (such as show(), collect()) is invoked.

Spark Connect follows a similar lazy evaluation model. Transformations are constructed on the client side and sent as unresolved proto plans to the server. The server then performs the necessary analysis and execution when an action is called.

Aspect

Spark Classic

Spark Connect

Transformations: df.filter(...), df.select(...), df.limit(...)

Lazy execution

Lazy execution

SQL queries: spark.sql("select …")

Lazy execution

Lazy execution

Actions: df.collect(), df.show()

Eager execution

Eager execution

SQL commands: spark.sql("insert …"), spark.sql("create …")

Eager execution

Eager execution

Schema analysis

Spark Classic performs schema analysis eagerly during the logical plan construction phase. When you define transformations, Spark immediately analyzes the DataFrame's schema to ensure all referenced columns and data types are valid. For example, executing spark.sql("select 1 as a, 2 as b").filter("c > 1") will throw an error eagerly, indicating the column c cannot be found.

Spark Connect instead constructs unresolved proto plans during transformation. When accessing a schema or executing an action, the client sends the unresolved plans to the server via RPC (remote procedure call). The server then performs the analysis and execution. This design defers schema analysis. For example, spark.sql("select 1 as a, 2 as b").filter("c > 1") will not throw any error because the unresolved plan is client-side only, but on df.columns or df.show() an error will be thrown because the unresolved plan is sent to the server for analysis.

Unlike query execution, Spark Classic and Spark Connect differ in when schema analysis occurs.

Aspect

Spark Classic

Spark Connect

Transformations: df.filter(...), df.select(...), df.limit(...)

Eager

Lazy

Schema access: df.columns, df.schema, df.isStreaming

Eager

Eager

Triggers an analysis RPC request, unlike Spark Classic

Actions: df.collect(), df.show()

Eager

Eager

Dependent session state: UDFs, temporary views, configs

Eager

Lazy

Evaluated during the execution

Best practices

The difference between lazy and eager analysis means there are some best practices to follow to avoid unexpected behavior and performance issues, specifically those caused by overwriting of temporary view names, capturing external variables in UDFs, delayed error detection, and excessive schema access on new DataFrames.

Create unique temporary view names

In Spark Connect, the DataFrame stores only a reference to the temporary view by name. As a result, if the temp view is later replaced, the data in the DataFrame will also change because it looks up the view by name at execution time.

This behavior differs from Spark Classic, where the logical plan of the temp view is embedded into the data frame's plan at the time of creation. Any subsequent replacement of the temp view does not affect the original data frame.

To mitigate the difference, always create unique temporary view names. For example, include a UUID in the view name. This avoids affecting any existing DataFrames that reference a previously registered temp view.

Python
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100

Wrap UDF definitions

In Spark Connect, Python UDFs are lazy. Their serialization and registration are deferred until execution time. In the following example, the UDF is only serialized and uploaded to the Spark cluster for execution when show() is called.

Python
from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

This behavior differs from Spark Classic, where UDFs are eagerly created. In Spark Classic, the value of x at the time of UDF creation is captured, so subsequent changes to x do not affect the already-created UDF.

If you need to modify the value of external variables that a UDF depends on, use a function factory (closure with early binding) to correctly capture variable values. Specifically, wrap the UDF creation in a helper function to capture the value of a dependent variable.

Python
from pyspark.sql.functions import udf

def make_udf(value):
def foo():
return value
return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

By wrapping the UDF definition inside another function (make_udf), we create a new scope where the current value of x is passed in as an argument. This ensures each generated UDF has its own copy of the field, bound at the time the UDF is created.

Trigger eager analysis for error detection

The following error handling is useful in Spark Classic because it performs eager analysis, which allows exceptions to be thrown promptly. However, in Spark Connect, this code does not cause any issue, as it only constructs a local unresolved proto plan without triggering any analysis.

Python
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")

If your code relies on the analysis exception and you want to catch it, you can trigger eager analysis, for example with df.columns, df.schema, or df.collect().

Python
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")

Avoid too many eager analysis requests

Performance can be improved if you avoid large numbers of analyze requests by avoiding excessive usage of calls triggering eager analysis (such as df.columns, df.schema).

If you cannot avoid this and must frequently check columns of new data frames, maintain a set to track column names to avoid analysis requests.

Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()

Another similar case is creating a large number of unnecessary intermediate DataFrames and analyzing them. Instead, obtain StructType field information directly from the DataFrame's schema instead of creating intermediate DataFrames.

Python
from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)