メインコンテンツまでスキップ

スキーマ進化 in the 状態ストア

この記事では、状態ストアでのスキーマ進化の概要と、サポートされているスキーマ変更の種類の例を示します。

状態ストアにおけるスキーマ進化とは?

スキーマの進化とは、データのスキーマに対する変更を処理するアプリケーションの機能を指します。

Databricks は、RocksDBtransformWithState.

スキーマの進化により、開発の柔軟性とメンテナンスの容易さがもたらされます。 スキーマ進化を使用すると、状態情報を失ったり、ヒストリカルデータの完全な再処理を必要とせずに、状態ストア内のデータ モデルまたはデータ型を適応させることができます。

必要条件

スキーマ進化を使用するには、状態ストアのエンコード形式を Avro に設定する必要があります。 現在のセッションでこれを設定するには、次のコマンドを実行します。

Python
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

スキーマ進化は、 transformWithState または transformWithStateInPandasを使用するステートフル操作でのみサポートされます。 これらの演算子と関連する APIs およびクラスには、次の要件があります。

  • Databricks Runtime 16.2 以降で使用できます。
  • コンピュートは、専用アクセスモードまたは非分離アクセスモードを使用する必要があります。
  • RocksDB状態ストア プロバイダーを使用する必要があります。Databricks では、コンピュート構成の一部として RocksDB を有効にすることをお勧めします。
  • transformWithStateInPandas Databricks Runtime 16.3 以降で標準アクセス モードをサポートします。

現在のセッションで RocksDB 状態ストア プロバイダーを有効にするには、次のコマンドを実行します。

Python
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

状態ストアでサポートされているスキーマ進化パターン

Databricks では、ステートフルな構造化ストリーミング操作に対して、次のスキーマ進化パターンがサポートされています。

パターン

説明

型拡張

データ型を制限の厳しい型から制限の少ない型に変更します。

フィールドの追加

既存の状態ストア変数のスキーマに新しいフィールドを追加します。

フィールドの削除

スキーマまたは状態ストア変数から既存のフィールドを削除します。

フィールドの並べ替え

変数内のフィールドを並べ替えます。

状態変数の追加

新しい状態変数をアプリケーションに追加します。

状態変数の削除

既存の状態変数をアプリケーションから削除します。

スキーマ進化はいつ起こりますか?

状態ストアのスキーマの進化は、ステートフル アプリケーションを定義するコードを更新することによって生じます。 このため、次のステートメントが適用されます。

  • スキーマの進化は、クエリのソース データのスキーマ変更の結果として自動的には発生しません。
  • スキーマの進化は、新しいバージョンのアプリケーションがデプロイされたときにのみ発生します。 ストリーミングクエリの 1 つのバージョンしか同時に実行できないため、ストリーミングジョブを再起動して状態変数のスキーマを進化させる必要があります。
  • コードですべての状態変数を明示的に定義し、すべての状態変数のスキーマを設定します。
    • Scala では、 Encoder を使用して各変数のスキーマを指定します。
    • Python では、スキーマを StructTypeとして明示的に構築します。

サポートされていないスキーマ進化パターン

次のスキーマ進化パターンはサポートされていません。

  • フィールドの名前変更 : フィールドは名前で一致するため、フィールドの名前変更はサポートされていません。フィールドの名前を変更しようとすると、フィールドを削除して新しいフィールドを追加することで処理されます。フィールドの削除と追加は許可されますが、元のフィールドの値は新しいフィールドに引き継がれないため、この操作ではエラーは発生しません。

  • キーの名前変更またはタイプの変更: マップ状態変数のキーの名前またはタイプは変更できません。

  • 型の絞り込み 型の縮小操作 ( ダウンキャスティング とも呼ばれます) はサポートされていません。これらの操作により、データが失われる可能性があります。サポートされていない型の縮小操作の例を次に示します。

    • double floatlong、または に絞り込むことはできません int
    • float long または int
    • long に絞り込むことはできません int

状態ストアで幅広化型

プリミティブ データ型を、より適応性の高い型に拡張できます。次のタイプの拡幅変更がサポートされています。

  • int longfloat、または に昇格できます double
  • long floatに昇格させるか、 double
  • float に昇格できます double
  • string に昇格できます bytes
  • bytes に昇格できます string

既存の値は、新しいタイプとしてアップキャストされます。たとえば、 1212.00になります。

タイプの拡大処理の例 transformWithState

Scala
// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}

状態ストアの値にフィールドを追加する

既存の状態ストア値のスキーマに新しいフィールドを追加できます。

古いスキーマで書き込まれたデータを読み取る場合、Avro エンコーダーは、 nullとしてネイティブにエンコードされた追加されたフィールドのデータを返します。

Python は常にこれらの値を次のように解釈 None。Scala のデフォルトの動作は、フィールドのタイプによって異なります。Databricks では、Scala が欠落しているデータの値を代入しないようにするロジックを実装することをお勧めします。「状態変数に追加されたフィールドのデフォルト値」を参照してください。

新しいフィールドを追加する例 transformWithState

Scala
// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}

状態ストアの値へのフィールドの削除

既存の変数のスキーマからフィールドを削除できます。古いスキーマでデータを読み取る場合、古いデータには存在し、新しいスキーマには存在しないフィールドは無視されます。

状態変数からフィールドを削除する例

Scala
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}

状態変数のフィールドを並べ替える

状態変数内のフィールドを並べ替えることができます (既存のフィールドを追加または削除するときなど)。状態変数のフィールドは、位置ではなく名前で照合されます。

状態変数のフィールドを並べ替える例

Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}

ステートフルなアプリケーションに状態変数を追加する

クエリの実行間に状態変数を追加することもできます。

: このパターンは Avro エンコーダーを必要とせず、すべての transformWithState アプリケーションでサポートされています。

ステートフルなアプリケーションに状態変数を追加する例

Scala
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}

ステートフルなアプリケーションから状態変数を削除する

フィールドを削除するだけでなく、クエリ実行の合間に状態変数を削除することもできます。

: このパターンは Avro エンコーダーを必要とせず、すべての transformWithState アプリケーションでサポートされています。

ステートフルなアプリケーションへの状態変数の削除の例

Scala
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

状態変数に追加されたフィールドのデフォルト値

既存の状態変数に新しいフィールドを追加すると、古いスキーマを使用して記述された状態変数は次のように動作します。

  • Avro エンコーダーは、追加されたフィールドに対して null 値を返します。
  • Python は、これらの値をすべてのデータ型の None に変換します。
  • Scala のデフォルトの動作は、データ型によって異なります。
    • 参照型は nullを返します。
    • プリミティブ型はデフォルト値を返しますが、これはプリミティブ型によって異なります。例としては、int タイプの 0bool タイプの falseなどがあります。

スキーマ進化によってフィールドに追加されたフラグを設定する組み込み機能やメタデータはありません。 以前のスキーマに存在しなかったフィールドに対して返される null 値を処理するロジックを実装する必要があります。

Scalaでは、Option[<Type>]を使用して、タイプ デフォルト を使用する代わりに、欠落している値を None として返すことで、デフォルト値の代入を回避できます。

スキーマの進化により None 型の値が返される状況を正しく処理するロジックを実装する必要があります。

状態変数に追加されたフィールドのデフォルト値の例

Scala
// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()

// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")

// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}

// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}

制限

次の表では、スキーマ進化の変更に対するデフォルトの制限について説明します。

説明

デフォルトの制限

オーバーライドする Spark 構成

状態変数のスキーマ進化。 クエリの再起動で複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。

16 時間

spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold

ストリーミング クエリのスキーマ進化。 クエリの再起動で複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。

128

spark.sql.streaming.stateStore.maxNumStateSchemaFiles

状態変数のスキーマ進化をトラブルシューティングする場合は、次の詳細を慎重に検討してください。