Compare Spark Connect to Spark Classic
Spark Connect is a gRPC-based protocol within Apache Spark that specifies how a client application can communicate with a remote Spark Server. It allows remote execution of Spark workloads using the DataFrame API.
Spark Connect is used in the following:
- Scala notebooks with Databricks Runtime version 13.3 and above, on standard and dedicated compute
- Python notebooks with Databricks Runtime version 14.3 and above, on standard and dedicated compute
- Serverless compute
- Databricks Connect
While both Spark Connect and Spark Classic utilize lazy execution for transformations, there are important differences to know to avoid unexpected behavior and performance issues when migrating existing code from Spark Classic to Spark Connect or when writing code that must work with both.
Lazy vs eager
The key difference between Spark Connect and Spark Classic is that Spark Connect defers analysis and name resolution to execution time, as summarized in the following table.
Aspect | Spark Classic | Spark Connect |
|---|---|---|
Query execution | Lazy | Lazy |
Schema analysis | Eager | Lazy |
Schema access | Local | Triggers RPC |
Temporary views | Plan embedded | Name lookup |
UDF serialization | At creation | At execution |
Query execution
Both Spark Classic and Spark Connect follow the same lazy execution model for query execution.
In Spark Classic, DataFrame transformations (such as filter and limit) are lazy. This means they are not executed immediately, but are recorded in a logical plan. The actual computation is triggered only when an action (such as show(), collect()) is invoked.
Spark Connect follows a similar lazy evaluation model. Transformations are constructed on the client side and sent as unresolved proto plans to the server. The server then performs the necessary analysis and execution when an action is called.
Aspect | Spark Classic | Spark Connect |
|---|---|---|
Transformations: | Lazy execution | Lazy execution |
SQL queries: | Lazy execution | Lazy execution |
Actions: | Eager execution | Eager execution |
SQL commands: | Eager execution | Eager execution |
Schema analysis
Spark Classic performs schema analysis eagerly during the logical plan construction phase. When you define transformations, Spark immediately analyzes the DataFrame's schema to ensure all referenced columns and data types are valid. For example, executing spark.sql("select 1 as a, 2 as b").filter("c > 1") will throw an error eagerly, indicating the column c cannot be found.
Spark Connect instead constructs unresolved proto plans during transformation. When accessing a schema or executing an action, the client sends the unresolved plans to the server via RPC (remote procedure call). The server then performs the analysis and execution. This design defers schema analysis. For example, spark.sql("select 1 as a, 2 as b").filter("c > 1") will not throw any error because the unresolved plan is client-side only, but on df.columns or df.show() an error will be thrown because the unresolved plan is sent to the server for analysis.
Unlike query execution, Spark Classic and Spark Connect differ in when schema analysis occurs.
Aspect | Spark Classic | Spark Connect |
|---|---|---|
Transformations: | Eager | Lazy |
Schema access: | Eager | Eager Triggers an analysis RPC request, unlike Spark Classic |
Actions: | Eager | Eager |
Dependent session state: UDFs, temporary views, configs | Eager | Lazy Evaluated during the execution |
Best practices
The difference between lazy and eager analysis means there are some best practices to follow to avoid unexpected behavior and performance issues, specifically those caused by overwriting of temporary view names, capturing external variables in UDFs, delayed error detection, and excessive schema access on new DataFrames.
Create unique temporary view names
In Spark Connect, the DataFrame stores only a reference to the temporary view by name. As a result, if the temp view is later replaced, the data in the DataFrame will also change because it looks up the view by name at execution time.
This behavior differs from Spark Classic, where the logical plan of the temp view is embedded into the data frame's plan at the time of creation. Any subsequent replacement of the temp view does not affect the original data frame.
To mitigate the difference, always create unique temporary view names. For example, include a UUID in the view name. This avoids affecting any existing DataFrames that reference a previously registered temp view.
- Python
- Scala
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Wrap UDF definitions
In Spark Connect, Python UDFs are lazy. Their serialization and registration are deferred until execution time. In the following example, the UDF is only serialized and uploaded to the Spark cluster for execution when show() is called.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
This behavior differs from Spark Classic, where UDFs are eagerly created. In Spark Classic, the value of x at the time of UDF creation is captured, so subsequent changes to x do not affect the already-created UDF.
If you need to modify the value of external variables that a UDF depends on, use a function factory (closure with early binding) to correctly capture variable values. Specifically, wrap the UDF creation in a helper function to capture the value of a dependent variable.
- Python
- Scala
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
By wrapping the UDF definition inside another function (make_udf), we create a new scope where the current value of x is passed in as an argument. This ensures each generated UDF has its own copy of the field, bound at the time the UDF is created.
Trigger eager analysis for error detection
The following error handling is useful in Spark Classic because it performs eager analysis, which allows exceptions to be thrown promptly. However, in Spark Connect, this code does not cause any issue, as it only constructs a local unresolved proto plan without triggering any analysis.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
If your code relies on the analysis exception and you want to catch it, you can trigger eager analysis, for example with df.columns, df.schema, or df.collect().
- Python
- Scala
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Avoid too many eager analysis requests
Performance can be improved if you avoid large numbers of analyze requests by avoiding excessive usage of calls triggering eager analysis (such as df.columns, df.schema).
If you cannot avoid this and must frequently check columns of new data frames, maintain a set to track column names to avoid analysis requests.
- Python
- Scala
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
if (!columns.contains(newColumnName)) {
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Another similar case is creating a large number of unnecessary intermediate DataFrames and analyzing them. Instead, obtain StructType field information directly from the DataFrame's schema instead of creating intermediate DataFrames.
- Python
- Scala
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
}
.toMap
println(structColumnFields)