スキーマ進化 in the 状態ストア
この記事では、状態ストアでのスキーマ進化の概要と、サポートされているスキーマ変更の種類の例を示します。
状態ストアにおけるスキーマ進化とは?
スキーマの進化とは、データのスキーマに対する変更を処理するアプリケーションの機能を指します。
Databricks は、RocksDBtransformWithState
.
スキーマの進化により、開発の柔軟性とメンテナンスの容易さがもたらされます。 スキーマ進化を使用すると、状態情報を失ったり、ヒストリカルデータの完全な再処理を必要とせずに、状態ストア内のデータ モデルまたはデータ型を適応させることができます。
必要条件
スキーマ進化を使用するには、状態ストアのエンコード形式を Avro に設定する必要があります。 現在のセッションでこれを設定するには、次のコマンドを実行します。
spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")
スキーマ進化は、 transformWithState
または transformWithStateInPandas
を使用するステートフル操作でのみサポートされます。 これらの演算子と関連する APIs およびクラスには、次の要件があります。
- Databricks Runtime 16.2 以降で使用できます。
- コンピュートは、専用アクセスモードまたは非分離アクセスモードを使用する必要があります。
- RocksDB状態ストア プロバイダーを使用する必要があります。Databricks では、コンピュート構成の一部として RocksDB を有効にすることをお勧めします。
transformWithStateInPandas
Databricks Runtime 16.3 以降で標準アクセス モードをサポートします。
現在のセッションで RocksDB 状態ストア プロバイダーを有効にするには、次のコマンドを実行します。
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
状態ストアでサポートされているスキーマ進化パターン
Databricks では、ステートフルな構造化ストリーミング操作に対して、次のスキーマ進化パターンがサポートされています。
パターン | 説明 |
---|---|
データ型を制限の厳しい型から制限の少ない型に変更します。 | |
既存の状態ストア変数のスキーマに新しいフィールドを追加します。 | |
スキーマまたは状態ストア変数から既存のフィールドを削除します。 | |
変数内のフィールドを並べ替えます。 | |
新しい状態変数をアプリケーションに追加します。 | |
既存の状態変数をアプリケーションから削除します。 |
スキーマ進化はいつ起こりますか?
状態ストアのスキーマの進化は、ステートフル アプリケーションを定義するコードを更新することによって生じます。 このため、次のステートメントが適用されます。
- スキーマの進化は、クエリのソース データのスキーマ変更の結果として自動的には発生しません。
- スキーマの進化は、新しいバージョンのアプリケーションがデプロイされたときにのみ発生します。 ストリーミングクエリの 1 つのバージョンしか同時に実行できないため、ストリーミングジョブを再起動して状態変数のスキーマを進化させる必要があります。
- コードですべての状態変数を明示的に定義し、すべての状態変数のスキーマを設定します。
- Scala では、
Encoder
を使用して各変数のスキーマを指定します。 - Python では、スキーマを
StructType
として明示的に構築します。
- Scala では、
サポートされていないスキーマ進化パターン
次のスキーマ進化パターンはサポートされていません。
-
フィールドの名前変更 : フィールドは名前で一致するため、フィールドの名前変更はサポートされていません。フィールドの名前を変更しようとすると、フィールドを削除して新しいフィールドを追加することで処理されます。フィールドの削除と追加は許可されますが、元のフィールドの値は新しいフィールドに引き継がれないため、この操作ではエラーは発生しません。
-
キーの名前変更またはタイプの変更: マップ状態変数のキーの名前またはタイプは変更できません。
-
型の絞り込み 型の縮小操作 ( ダウンキャスティング とも呼ばれます) はサポートされていません。これらの操作により、データが失われる可能性があります。サポートされていない型の縮小操作の例を次に示します。
double
float
、long
、または に絞り込むことはできませんint
float
long
またはint
long
に絞り込むことはできませんint
状態ストアで幅広化型
プリミティブ データ型を、より適応性の高い型に拡張できます。次のタイプの拡幅変更がサポートされています。
int
long
、float
、または に昇格できますdouble
long
float
に昇格させるか、double
float
に昇格できますdouble
string
に昇格できますbytes
bytes
に昇格できますstring
既存の値は、新しいタイプとしてアップキャストされます。たとえば、 12
は 12.00
になります。
タイプの拡大処理の例 transformWithState
- Scala
- Python
// Initial run with Integer field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with Long field (type widening)
case class StateV2(value1: Long)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV2(value.toLong))
value
}
}
}
class IntStateProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with Integer field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to integer and update state
value = pdf["value"].iloc[0]
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class LongStateProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with Long field (type widening)
state_schema = StructType([
StructField("value1", LongType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
# Convert input value to long and update state
value = pdf["value"].iloc[0]
# When reading state written with IntStateProcessor,
# it will be automatically converted to Long
self.state.update((int(value),))
# Read current state
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
状態ストアの値にフィールドを追加する
既存の状態ストア値のスキーマに新しいフィールドを追加できます。
古いスキーマで書き込まれたデータを読み取る場合、Avro エンコーダーは、 null
としてネイティブにエンコードされた追加されたフィールドのデータを返します。
Python は常にこれらの値を次のように解釈 None
。Scala のデフォルトの動作は、フィールドのタイプによって異なります。Databricks では、Scala が欠落しているデータの値を代入しないようにするロジックを実装することをお勧めします。「状態変数に追加されたフィールドのデフォルト値」を参照してください。
新しいフィールドを追加する例 transformWithState
- Scala
- Python
// Initial run with single field
case class StateV1(value1: Integer)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt))
value
}
}
}
// Later run with additional field
case class StateV2(value1: Integer, value2: String)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1),
// it will be automatically converted to StateV2(1, null)
val currentState = state.get()
// Now update with both fields populated
state.update(StateV2(value.toInt, s"metadata-${value}"))
value
}
}
}
class StateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single field
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"stateValue": [current_state[0]]
})
class StateV2Processor(StatefulProcessor):
def init(self, handle):
# Later schema with additional fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Read current state
current_state = self.state.get()
# When reading state written with StateV1(1),
# it will be automatically converted to StateV2(1, None)
value1 = current_state[0]
value2 = current_state[1]
# Now update with both fields populated
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
状態ストアの値へのフィールドの削除
既存の変数のスキーマからフィールドを削除できます。古いスキーマでデータを読み取る場合、古いデータには存在し、新しいスキーマには存在しないフィールドは無視されます。
状態変数からフィールドを削除する例
- Scala
- Python
// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with field removed
case class StateV2(value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1)
val currentState = state.get()
state.update(StateV2(value.toInt))
value
}
}
}
class RemoveFieldsOriginalProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with multiple fields
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class RemoveFieldsReducedProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with field removed
state_schema = StructType([
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
# it will be automatically converted to just (1,)
current_state = self.state.get()
value1 = current_state[0]
self.state.update((int(value),))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]]
})
状態変数のフィールドを並べ替える
状態変数内のフィールドを並べ替えることができます (既存のフィールドを追加または削除するときなど)。状態変数のフィールドは、位置ではなく名前で照合されます。
状態変数のフィールドを並べ替える例
- Scala
- Python
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2("metadata-1", 1)
val currentState = state.get()
state.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
class OrderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema with fields in original order
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ReorderedFieldsProcessor(StatefulProcessor):
def init(self, handle):
# Later schema with reordered fields
state_schema = StructType([
StructField("value2", StringType(), True),
StructField("value1", IntegerType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
# it will be automatically converted to ("metadata-1", 1)
current_state = self.state.get()
value2 = current_state[0]
value1 = current_state[1]
self.state.update((f"new-metadata-{value}", int(value)))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value2": [current_state[0]],
"value1": [current_state[1]]
})
ステートフルなアプリケーションに状態変数を追加する
クエリの実行間に状態変数を追加することもできます。
注 : このパターンは Avro エンコーダーを必要とせず、すべての transformWithState
アプリケーションでサポートされています。
ステートフルなアプリケーションに状態変数を追加する例
- Scala
- Python
// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
case class StateV2(value1: String, value2: Integer)
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(s"new-metadata-${value}", value.toInt))
value
}
}
}
class MultiStateV1Processor(StatefulProcessor):
def init(self, handle):
# Initial schema with a single state variable
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
ステートフルなアプリケーションから状態変数を削除する
フィールドを削除するだけでなく、クエリ実行の合間に状態変数を削除することもできます。
注 : このパターンは Avro エンコーダーを必要とせず、すべての transformWithState
アプリケーションでサポートされています。
ステートフルなアプリケーションへの状態変数の削除の例
- Scala
- Python
case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
@transient var state2: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
state2 = getHandle.getValueState[StateV2](
"testState2",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
val currentState2 = state2.get()
state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
value
}
}
}
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state1: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state1 = getHandle.getValueState[StateV1](
"testState1",
Encoders.product[StateV1],
TTLConfig.NONE)
// delete old state variable that we no longer need
getHandle.deleteIfExists("testState2")
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state1.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
class MultiStateV2Processor(StatefulProcessor):
def init(self, handle):
# Add a second state variable
state1_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
state2_schema = StructType([
StructField("value1", StringType(), True),
StructField("value2", IntegerType(), True)
])
self.state1 = handle.getValueState("testState1", state1_schema)
self.state2 = handle.getValueState("testState2", state2_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
# Access and update the new state variable
current_state2 = self.state2.get() # Will be None on first run
self.state2.update((f"new-metadata-{value}", int(value)))
current_state1 = self.state1.get()
current_state2 = self.state2.get()
yield pd.DataFrame({
"id": [key[0]],
"state1_value1": [current_state1[0]],
"state1_value2": [current_state1[1]],
"state2_value1": [current_state2[0]],
"state2_value2": [current_state2[1]]
})
class RemoveStateVarProcessor(StatefulProcessor):
def init(self, handle):
# Only use one state variable and delete the other
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state1 = handle.getValueState("testState1", state_schema)
# Delete old state variable that we no longer need
handle.deleteIfExists("testState2")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state1.update((int(value), f"metadata-{value}"))
current_state = self.state1.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
状態変数に追加されたフィールドのデフォルト値
既存の状態変数に新しいフィールドを追加すると、古いスキーマを使用して記述された状態変数は次のように動作します。
- Avro エンコーダーは、追加されたフィールドに対して
null
値を返します。 - Python は、これらの値をすべてのデータ型の
None
に変換します。 - Scala のデフォルトの動作は、データ型によって異なります。
- 参照型は
null
を返します。 - プリミティブ型はデフォルト値を返しますが、これはプリミティブ型によって異なります。例としては、
int
タイプの0
やbool
タイプのfalse
などがあります。
- 参照型は
スキーマ進化によってフィールドに追加されたフラグを設定する組み込み機能やメタデータはありません。 以前のスキーマに存在しなかったフィールドに対して返される null 値を処理するロジックを実装する必要があります。
Scalaでは、Option[<Type>]
を使用して、タイプ デフォルト を使用する代わりに、欠落している値を None
として返すことで、デフォルト値の代入を回避できます。
スキーマの進化により None
型の値が返される状況を正しく処理するロジックを実装する必要があります。
状態変数に追加されたフィールドのデフォルト値の例
- Scala
- Python
// Example demonstrating how null defaults work in schema evolution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)
class ProcessorV1 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV1] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV1](
"testState",
Encoders.product[StateV1],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
state.update(StateV1(value.toInt, s"metadata-${value}"))
value
}
}
}
// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])
class ProcessorV2 extends StatefulProcessor[String, String, String] {
@transient var state: ValueState[StateV2] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
state = getHandle.getValueState[StateV2](
"testState",
Encoders.product[StateV2],
TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
rows.map { value =>
// Reading from state
val currentState = state.get()
// Showing how null defaults work for different types
// When reading state written with StateV1(1, "metadata-1"),
// it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
println(s"Current state: $currentState")
// For primitive types like Long, the UnsafeRow default for null is 0
val longValue = if (currentState.value3 == 0L) {
println("The value3 field is the default value (0)")
100L // Set a real value now
} else {
currentState.value3
}
// Now update with all fields populated
state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
value
}
}
}
class NullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Initial schema
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
self.state.update((int(value), f"metadata-{value}"))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]]
})
class ExpandedNullDefaultsProcessor(StatefulProcessor):
def init(self, handle):
# Evolution: Adding new fields with null/default values
state_schema = StructType([
StructField("value1", IntegerType(), True),
StructField("value2", StringType(), True),
StructField("value3", LongType(), True),
StructField("value4", IntegerType(), True),
StructField("value5", BooleanType(), True)
])
self.state = handle.getValueState("testState", state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
for pdf in rows:
value = pdf["value"].iloc[0]
# Reading from state
current_state = self.state.get()
# Showing how null defaults work in Python
# When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
# it will be automatically converted to (1, "metadata-1", None, None, None)
# In Python, both primitive and reference types will be None
value1 = current_state[0]
value2 = current_state[1]
value3 = current_state[2] # Will be None when evolved from older schema
value4 = current_state[3] # Will be None when evolved from older schema
value5 = current_state[4] # Will be None when evolved from older schema
# Check if value3 is None
if value3 is None:
print("The value3 field is None (default value for evolution)")
value3 = 100 # Set a real value now
# Now update with all fields populated
self.state.update((
value1,
value2,
value3,
value4 if value4 is not None else 42,
value5 if value5 is not None else True
))
current_state = self.state.get()
yield pd.DataFrame({
"id": [key[0]],
"value1": [current_state[0]],
"value2": [current_state[1]],
"value3": [current_state[2]],
"value4": [current_state[3]],
"value5": [current_state[4]]
})
制限
次の表では、スキーマ進化の変更に対するデフォルトの制限について説明します。
説明 | デフォルトの制限 | オーバーライドする Spark 構成 |
---|---|---|
状態変数のスキーマ進化。 クエリの再起動で複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。 | 16 時間 |
|
ストリーミング クエリのスキーマ進化。 クエリの再起動で複数のスキーマ変更を適用すると、1 つのスキーマの進化としてカウントされます。 | 128 |
|
状態変数のスキーマ進化をトラブルシューティングする場合は、次の詳細を慎重に検討してください。
- 一部のパターンは、スキーマの進化ではサポートされていません。 サポートされていないスキーマ進化パターンを参照してください。
- スキーマの進化には、
transformWithState
のすべての要件があり、 Avro エンコード形式が必要です。 「要件」を参照してください。 - スキーマの進化につながるコード変更をデプロイするには、ストリーミング クエリを再起動する必要があります。 「スキーマ進化はいつ発生しますか?」を参照してください。