Skip to main content

Schema evolution in the state store

This article provides an overview of schema evolution in the state store and examples of types of supported schema changes.

What is schema evolution in the state store?

Schema evolution refers to the ability of an application to handle changes to the schema of data.

Databricks supports schema evolution in the RocksDB state store for Structured Streaming applications that use transformWithState.

Schema evolution provides flexibility for development and ease of maintenance. Use schema evolution to adapt the data model or data types in your state store without losing state information or requiring full reprocessing of historical data.

Requirements

You must set the state store encoding format to Avro to use schema evolution. To set this for the current session, run the following:

Python
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

Schema evolution is supported only for stateful operations that use transformWithState or transformWithStateInPandas. These operators and the related APIs and classes have the following requirements:

  • Available in Databricks Runtime 16.2 and above.
  • Compute must use dedicated or no-isolation access mode.
  • You must use the RocksDB state store provider. Databricks recommends enabling RocksDB as part of the compute configuration.
  • transformWithStateInPandas supports standard access mode in Databricks Runtime 16.3 and above.

To enable the RocksDB state store provider for the current session, run the following:

Python
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Supported schema evolution patterns in the state store

Databricks supports the following schema evolution patterns for stateful Structured Streaming operations.

Pattern

Description

Type widening

Change data types from more restrictive to less restrictive types.

Adding fields

Add new fields to the schema of existing state store variables.

Removing fields

Remove existing fields from the schema or a state store variable.

Reordering fields

Reorder fields in a variable.

Adding state variables

Add a new state variable to an application.

Removing state variables

Remove an existing state variable from an application.

When does schema evolution occur?

Schema evolution in the state store results from updating the code that defines your stateful application. Because of this, the following statements apply:

  • Schema evolution does not occur automatically as a result of schema changes in the source data for the query.
  • Schema evolution occurs only when a new version of the application is deployed. Because only one version of a streaming query can run simultaneously, you must restart your streaming job to evolve the schema for state variables.
  • Your code explicitly defines all state variables and sets the schema for all state variables.
    • In Scala, you use an Encoder to specify the schema for each variable.
    • In Python, you explicitly construct a schema as a StructType.

Unsupported schema evolution patterns

The following schema evolution patterns are not supported:

  • Field renaming: Renaming fields is not supported because fields are matched by name. Attempting to rename a field is handled by removing the field and adding a new field. This operation does not result in an error as removing and adding fields are allowed, but the values from the original field are not carried over to the new field.

  • May key renaming or type changes: You cannot change the name or type of keys in map state variables.

  • Type narrowing Type narrowing operations, also known as downcasting, are not supported. These operations might result in data loss. The following are examples of unsupported type narrowing operations:

    • double cannot be narrowed to float, long, or int
    • float cannot be narrowed to long or int
    • long cannot be narrowed to int

Type widening in the state store

You can widen primitive data types to more accommodating types. The following type widening changes are supported:

  • int can be promoted to long, float, or double
  • long can be promoted to float or double
  • float can be promoted to double
  • string can be promoted to bytes
  • bytes can be promoted to string

Existing values are upcast as the new type. For example, 12 becomes 12.00.

Example of type widening with transformWithState

Scala
// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}

Add fields to state store values

You can add new fields to the schema of existing state store values.

When reading data written with the old schema, the Avro encoder returns data for added fields natively encoded as null.

Python always interprets these values as None. Scala has different default behavior depending on the type for the field. Databricks recommends implementing logic to ensure that Scala doesn't impute values for missing data. See Default values for fields added to state variable.

Examples of adding new fields with transformWithState

Scala
// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}

Remove fields to state store values

You can remove fields from the schema of an existing variable. When reading data with the old schema, fields present in the old data but not in the new schema are ignored.

Examples of removing fields from state variables

Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}

Reorder fields in a state variable

You can reorder fields in a state variable, including when you are adding or removing existing fields. Fields in state variables are matched by name, not position.

Examples of reordering fields in a state variable

Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}

Add a state variable to a stateful application

We can also add state variables in between query runs.

Note: This pattern does not require an Avro encoder and is support by all transformWithState applications.

Example of adding a state variable to a stateful application

Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}

Remove a state variable from a stateful application

In addition to removing fields, you can also remove state variables in between query runs.

Note: This pattern does not require an Avro encoder and is supported by all transformWithState applications.

Example of removing a state variable to a stateful application

Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

Default values for fields added to state variable

When you add new fields to an existing state variable, state variables written using the old schema have the following behavior:

  • The Avro encoder returns a null value for added fields.
  • Python converts these values to None for all data types.
  • Scala default behavior differs by data type:
    • Reference types return null.
    • Primitive types return a default value, which differs based on the primitive type. Examples include 0 for int types or false for bool types.

There is no built-in functionality or metadata that flags the field as added through schema evolution. You must implement logic to handle null values returned for fields that did not exist in your previous schema.

For Scala, you can avoid imputing default values by using Option[<Type>], which returns missing values as None instead of using the type default.

You must implement logic to correctly handle situations where None type values are returned because of schema evolution.

Example of default values for added fields to a state variable

Scala
// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()

// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")

// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}

// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}

Limitations

The following table describes default limits for schema evolution changes:

Description

Default limit

Spark configuration to override

Schema evolutions for a state variable. Applying multiple schema changes in a query restart counts as a single schema evolution.

16

spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold

Schema evolutions for the streaming query. Applying multiple schema changes in a query restart counts as a single schema evolution.

128

spark.sql.streaming.stateStore.maxNumStateSchemaFiles

Consider the following details carefully when troubleshooting schema evolution for state variables: