Crie um aplicativo personalizado com estado
Visualização
Esse recurso está em Public Preview em Databricks Runtime 16.2 e acima.
O senhor pode criar aplicativos de transmissão usando operadores de estado personalizados para implementar soluções de baixa latência e near time real que usam lógica de estado arbitrária. Os operadores personalizados com estado desbloqueiam novos casos e padrões de uso operacional indisponíveis por meio do processamento tradicional de transmissão estruturada.
Databricks recomenda o uso da funcionalidade integrada de transmissão estruturada para operações com suporte de estado, como agregações, deduplicação e junção de transmissão. Consulte O que é transmissão stateful?
A Databricks recomenda o uso de transformWithState
em vez de operadores legados para transformações arbitrárias de estado. Para obter a documentação sobre os operadores flatMapGroupsWithState
e mapGroupsWithState
legados, consulte Operadores de estado arbitrários legados.
Requisitos
O operador transformWithState
e as APIs e classes relacionadas têm os seguintes requisitos:
- Disponível em Databricks Runtime 16.2 e acima.
- deve usar o modo de acesso dedicado ou sem isolamento.
- O senhor deve usar o provedor RocksDB armazenamento do estado. Databricks recomenda ativar o RocksDB como parte da configuração do compute.
transformWithStateInPandas
suporta o modo de acesso padrão em Databricks Runtime 16.3 e acima.
Para ativar o provedor RocksDB armazenamento do estado para a sessão atual, execute o seguinte:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
O que é transformWithState?
O operador transformWithState
aplica um processador stateful personalizado a uma consulta de transmissão estruturada. Você deve implementar um processador com estado personalizado para usar transformWithState
. A transmissão estruturada inclui o site APIs para criar seu processador stateful usando Python, Scala, ou Java.
O senhor usa transformWithState
para aplicar lógica personalizada a um agrupamento key para registros processados de forma incremental com transmissão estruturada. O seguinte descreve o design de alto nível:
- Defina uma ou mais variáveis de estado.
- As informações de estado são mantidas para cada agrupamento key e podem ser acessadas para cada variável de estado de acordo com a lógica definida pelo usuário.
- Para cada microlote processado, todos os registros do site key estão disponíveis como um iterador.
- Use alças integradas para controlar quando e como os registros são emitidos com base em temporizadores e condições definidas pelo usuário.
- Os valores de estado suportam definições individuais de tempo de vida (TTL), permitindo flexibilidade no gerenciamento da expiração e do tamanho do estado.
O PySpark usa o operador transformWithStateInPandas
em vez de transformWithState
. A documentação da Databricks usa transformWithState
para descrever a funcionalidade das implementações Python e Scala.
As implementações Scala e Python de transformWithState
e APIs relacionadas diferem devido às especificidades da linguagem, mas oferecem a mesma funcionalidade. Consulte os exemplos específicos da linguagem e a documentação da API para sua linguagem de programação preferida.
alças de processamento integrado
O senhor implementa a lógica central do seu aplicativo personalizado com estado implementando manipuladores que usam identificadores integrados.
- As alças fornecem os métodos para interagir com valores de estado e temporizadores, processar registros recebidos e emitir registros.
- Os manipuladores definem sua lógica personalizada orientada por eventos.
Os identificadores para cada tipo de estado são implementados com base na estrutura de dados subjacente, mas cada um contém a funcionalidade de obter, colocar, atualizar e excluir registros.
Os manipuladores são implementados com base em eventos observados em registros de entrada ou temporizadores, usando a seguinte semântica:
- Defina um handler usando o método
handleInputRows
para controlar como os dados são processados, o estado é atualizado e os registros são emitidos para cada microlote de registros processados para o agrupamento key. Consulte Manipular linhas de entrada. - Defina um manipulador usando o método
handleExpiredTimer
para usar o limite baseado em tempo para a lógica de execução, independentemente de registros adicionais serem ou não processados para o agrupamento key. Veja os eventos programados do programa.
A tabela a seguir tem uma comparação dos comportamentos funcionais suportados por esses manipuladores:
Comportamento |
|
|
---|---|---|
Obtenha, coloque, atualize ou limpe valores de estado | Sim | Sim |
Criar ou excluir um cronômetro | Sim | Sim |
Emita registros | Sim | Sim |
Iterar sobre os registros nos micro lotes atuais | Sim | Não |
Lógica de gatilho com base no tempo decorrido | Não | Sim |
Você pode combinar handleInputRows
e handleExpiredTimer
para implementar uma lógica complexa conforme necessário.
Por exemplo, o senhor poderia implementar um aplicativo que usa handleInputRows
para atualizar os valores de estado de cada microlote e definir um cronômetro de 10 segundos no futuro. Se nenhum registro adicional for processado, o senhor pode usar handleExpiredTimer
para emitir os valores atuais no armazenamento do estado. Se novos registros forem processados para o agrupamento key, é possível limpar o timer existente e definir um novo timer.
Tipos de estado personalizados
Você pode implementar vários objetos de estado em um único operador com estado. Os nomes que o senhor dá a cada objeto de estado persistem no armazenamento do estado, que pode ser acessado com o leitor de armazenamento do estado. Se seu objeto de estado usa um StructType
, você fornece nomes para cada campo na estrutura ao passar o esquema. Esses nomes também são visíveis ao ler o armazenamento do estado. Consulte Ler informações sobre o estado da transmissão estruturada.
A funcionalidade fornecida pelas classes e operadores integrados tem o objetivo de oferecer flexibilidade e extensibilidade, e as escolhas de implementação devem ser informadas pela lógica completa que o aplicativo precisa executar. Por exemplo, o senhor pode implementar uma lógica quase idêntica usando um ValueState
agrupado pelos campos user_id
e session_id
ou um MapState
agrupado por user_id
em que session_id
é o key para o MapState
. Nesse caso, um MapState
pode ser a implementação preferida se a lógica precisar avaliar as condições em vários session_id
s.
As seções a seguir descrevem os tipos de estado suportados pelo transformWithState
.
Estado do valor
Para cada agrupamento key, há um valor associado.
Um estado de valor pode incluir tipos complexos, como uma estrutura ou tupla. Ao atualizar um ValueState
, você implementa a lógica para substituir todo o valor. O TTL para um estado de valor é redefinido quando o valor é atualizado, mas não é redefinido se uma fonte key correspondente a um ValueState
for processada sem atualizar o ValueState
armazenado.
Estado da lista
Para cada agrupamento key, há uma lista associada.
Um estado de lista é uma coleção de valores, cada um dos quais pode incluir tipos complexos. Cada valor em uma lista tem seu próprio TTL. Você pode adicionar itens a uma lista anexando itens individuais, anexando uma lista de itens ou sobrescrevendo a lista inteira por um put
. Somente as operações put são consideradas uma atualização para o Reset TTL.
Estado do mapa
Para cada agrupamento key, há um mapa associado. Os mapas são o equivalente funcional do Apache Spark a um dict do Python.
A chave de agrupamento descreve os campos especificados na cláusula GROUP BY
da consulta de transmissão estruturada. Os estados do mapa contêm um número arbitrário de par key-valor para um agrupamento key.
Por exemplo, se o senhor agrupar por user_id
e quiser definir um mapa para cada session_id
, seu agrupamento key é user_id
e o key em seu mapa é session_id
.
Um estado de mapa é uma coleção de chaves distintas, cada uma delas mapeada para um valor que pode incluir tipos complexos. Cada par key-valor em um mapa tem seu próprio TTL. O senhor pode atualizar o valor de um key específico ou remover um key e seu valor. O senhor pode retornar um valor individual usando seu key, listar todas as chaves, listar todos os valores ou retornar um iterador para trabalhar com o conjunto completo de par key-value no mapa.
Inicializar uma variável de estado personalizada
Ao inicializar seu StatefulProcessor
, você cria uma variável local para cada objeto de estado que permite interagir com objetos de estado em sua lógica personalizada. As variáveis de estado são definidas e inicializadas por meio da substituição do método integrada init
na classe StatefulProcessor
.
Você define uma quantidade arbitrária de objetos de estado usando os métodos getValueState
, getListState
e getMapState
ao inicializar seu StatefulProcessor
.
Cada objeto de estado deve ter o seguinte:
- Um nome exclusivo
- Um esquema especificado
- No Python, o esquema é especificado explicitamente.
- Em Scala, passe um
Encoder
para especificar o esquema de estado.
O senhor também pode fornecer uma duração opcional de tempo de vida (TTL) em milissegundos. Se estiver implementando um estado de mapa, o senhor deverá fornecer uma definição de esquema separada para a chave de mapa e os valores.
A lógica de como as informações de estado são consultadas, atualizadas e emitidas é tratada separadamente. Consulte Usar suas variáveis de estado.
Exemplo de aplicativo com estado
A seguir, demonstramos a sintaxe básica para definir e usar um processador de estado personalizado com transformWithState
, incluindo exemplos de variáveis de estado para cada tipo compatível. Para obter mais exemplos, consulte Exemplos de aplicativos com estado.
O Python usa tuplas para todas as interações com valores de estado. Isso significa que o código Python deve passar valores usando tuplas ao usar operações como put
e update
e esperar lidar com tuplas ao usar get
.
Por exemplo, se o esquema para seu estado de valor for apenas um único inteiro, você implementaria um código como o seguinte:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Isso também vale para itens em ListState
ou valores em MapState
.
- Python
- Scala
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
Alça de processador Stateful
PySpark inclui a classe StatefulProcessorHandle
para fornecer acesso a funções que controlam como o código Python definido pelo usuário interage com as informações de estado. Você deve sempre importar e passar o StatefulProcessorHandle
para a variável handle
ao inicializar um StatefulProcessor
.
A variável handle
vincula a variável local em sua classe Python à variável de estado.
O Scala usa o método getHandle
.
Especifique o estado inicial
Opcionalmente, o senhor pode fornecer um estado inicial para ser usado com os primeiros microlotes. Isso pode ser útil ao migrar um fluxo de trabalho existente para um novo aplicativo personalizado, atualizar um operador com estado para alterar o esquema ou a lógica ou reparar uma falha que não pode ser reparada automaticamente e requer intervenção manual.
Use o leitor de armazenamento do estado para consultar as informações de estado de um ponto de controle existente. Consulte Ler informações sobre o estado da transmissão estruturada.
Se o senhor estiver convertendo uma tabela Delta existente em um aplicativo com estado, leia a tabela usando spark.read.table("table_name")
e passe o DataFrame resultante. Opcionalmente, você pode selecionar ou modificar os campos para que estejam em conformidade com seu novo aplicativo com estado.
O senhor fornece um estado inicial usando um DataFrame com o mesmo esquema de agrupamento key que as linhas de entrada.
O Python usa handleInitialState
para especificar o estado inicial enquanto define um StatefulProcessor
. O Scala usa a classe distinta StatefulProcessorWithInitialState
.
Use suas variáveis de estado
Os objetos de estado compatíveis fornecem métodos para obter o estado, atualizar as informações de estado existentes ou limpar o estado atual. Cada tipo de estado suportado tem uma implementação exclusiva de métodos que correspondem à estrutura de dados implementada.
Cada agrupamento key observado tem informações de estado dedicadas.
- Os registros são emitidos com base na lógica que você implementa e usando o esquema de saída que você especifica. Consulte Emitir registros.
- O senhor pode acessar os valores no armazenamento do estado usando o leitor
statestore
. Esse leitor tem muitas funcionalidades e não se destina a cargas de trabalho de baixa latência. Consulte Ler informações sobre o estado da transmissão estruturada. - A lógica especificada usando
handleInputRows
só é acionada se os registros para o key estiverem presentes em um micro-lote. Consulte Manipular linhas de entrada. - Use
handleExpiredTimer
para implementar uma lógica baseada em tempo que não dependa da observação de registros para disparar. Veja os eventos programados do programa.
Os objetos de estado são isolados por uma chave de agrupamento com as seguintes implicações:
- Os valores de estado não podem ser afetados por registros associados a um agrupamento diferente key.
- O senhor não pode implementar uma lógica que dependa da comparação de valores ou da atualização do estado entre chaves de agrupamento.
O senhor pode comparar valores em um agrupamento key. Use um MapState
para implementar a lógica com um segundo key que sua lógica personalizada pode usar. Por exemplo, agrupar por user_id
e digitar seu MapState
usando o endereço IP permitiria que o senhor implementasse uma lógica que rastreasse sessões simultâneas de usuários.
Considerações avançadas para trabalhar com o estado
A gravação em uma variável de estado aciona uma gravação no RocksDB. Para otimizar o desempenho, o site Databricks recomenda processar todos os valores no iterador para um determinado key e confirmar as atualizações em uma única gravação sempre que possível.
As atualizações de estado são tolerantes a falhas. Se uma tarefa falhar antes que uma microlota termine de ser processada, o valor da última microlota bem-sucedida será usado na nova tentativa.
Os valores de estado não têm nenhum padrão integrado. Se a sua lógica exigir a leitura de informações de estado existentes, use o método exists
ao implementar a lógica.
MapState
têm funcionalidade adicional para verificar a chave individual ou listar todas as chaves para implementar a lógica do estado nulo.
Emita registros
A lógica definida pelo usuário controla como transformWithState
emite registros. Os registros são emitidos por agrupamento key.
Os aplicativos personalizados com estado não fazem suposições sobre como as informações de estado são usadas ao determinar como emitir registros, e o número de registros retornado para uma determinada condição pode ser nenhum, um ou muitos.
Você implementa a lógica para emitir registros usando handleInputRows
ou handleExpiredTimer
. Consulte Manipular linhas de entrada e Eventos cronometrados do programa.
Você pode implementar vários valores de estado e definir várias condições para emitir registros, mas todos os registros emitidos devem usar o mesmo esquema.
- Python
- Scala
In Python, you define your output schema using the outputStructType
keyword while calling transformWithStateInPandas
.
You emit records using a pandas DataFrame object and yield
.
You can optionally yield
an empty DataFrame. When combined with update
output mode, emitting an empty DataFrame updates the values for the grouping key to be null.
In Scala, you emit records using an Iterator
object. The schema of the output is derived from emitted records.
You can optionally emit an empty Iterator
. When combined with update
output mode, emitting an empty Iterator
updates the values for the grouping key to be null.
Manipular linhas de entrada
Use o método handleInputRows
para definir a lógica de como os registros observados na consulta de transmissão interagem com os valores de estado e os atualizam. O manipulador que o senhor define com o método handleInputRows
é executado sempre que algum registro é processado pela consulta de transmissão estruturada.
Para a maioria dos aplicativos com estado implementados com transformWithState
, a lógica central é definida usando handleInputRows
.
Para cada atualização de microlotes processada, todos os registros nas microlotes de um determinado agrupamento key estão disponíveis usando um iterador. A lógica definida pelo usuário pode interagir com todos os registros do microlote atual e valores no armazenamento de estado.
Eventos cronometrados do programa
Você pode usar temporizadores para implementar uma lógica personalizada com base no tempo decorrido de uma condição especificada.
Você trabalha com temporizadores implementando um método handleExpiredTimer
.
Em um agrupamento key, os temporizadores são identificados exclusivamente por seu carimbo de data/hora.
Quando um cronômetro expira, o resultado é determinado pela lógica implementada em seu aplicativo. Os padrões comuns incluem:
- Emitir informações armazenadas em uma variável de estado.
- Despejo de informações de estado armazenadas.
- Criando um novo cronômetro.
Os timers expirados são acionados mesmo que nenhum registro para o key associado seja processado em um micro-lote.
Especifique o modelo de tempo
Ao passar seu StatefulProcessor
para transformWithState
, você deve especificar o modelo de tempo. As seguintes opções são suportadas:
ProcessingTime
EventTime
NoTime
ouTimeMode.None()
Especificar NoTime
significa que os temporizadores não são suportados pelo seu processador.
valores do temporizador integrado
A Databricks não recomenda invocar o relógio do sistema em seu aplicativo personalizado com estado, pois isso pode levar a novas tentativas não confiáveis em caso de falha de tarefa. Use os métodos na classe TimerValues
quando precisar acessar o tempo de processamento ou a marca d'água:
| Descrição |
---|---|
| Retorna o registro de data e hora do tempo de processamento do lote atual em milissegundos desde a época. |
| Retorna o registro de data e hora da marca d'água para o lote atual em milissegundos desde a época. |
O tempo de processamento descreve o tempo em que os micro-lotes são processados pelo site Apache Spark. Muitas fontes de transmissão, como Kafka, também incluem o tempo de processamento do sistema.
As marcas d'água nas consultas de transmissão geralmente são definidas em relação ao tempo do evento ou ao tempo de processamento da fonte de transmissão. Consulte Aplicar marcas d'água para controlar o limite de processamento de dados.
Tanto as marcas d'água quanto as janelas podem ser usadas em combinação com transformWithState
. Você pode implementar uma funcionalidade semelhante em seu aplicativo com estado personalizado aproveitando TTL, temporizadores e a funcionalidade MapState
ou ListState
.
O que é o horário de vida do estado (TTL)?
Cada um dos valores de estado usados por transformWithState
suporta uma especificação opcional de tempo de vida (TTL). Quando o TTL expira, o valor é retirado do armazenamento do estado. O TTL interage apenas com valores no armazenamento do estado, o que significa que o senhor pode implementar uma lógica para despejar informações do estado, mas não pode acionar diretamente a lógica, pois o TTL despeja valores do estado.
Se você não implementar o TTL, deverá lidar com o despejo de estado usando outra lógica para evitar o crescimento infinito do estado.
O TTL é aplicado para cada valor de estado, com regras diferentes para cada tipo de estado.
- As variáveis de estado têm escopo para a chave de agrupamento.
- Para objetos
ValueState
, apenas um único valor é armazenado por agrupamento key. O TTL se aplica a esse valor. - Para objetos
ListState
, a lista pode conter muitos valores. O TTL se aplica a cada valor em uma lista de forma independente. - Para objetos
MapState
, cada mapa key tem um valor de estado associado. O TTL se aplica independentemente a cada par key-valor em um mapa.
Para todos os tipos de estado, TTL Reset se a informação do estado for atualizada.
Embora o TTL tenha como escopo valores individuais em um ListState
, a única maneira de atualizar um valor em uma lista é usar o método put
para substituir todo o conteúdo da variável ListState
.
Qual é a diferença entre temporizadores e TTL?
Há alguma sobreposição entre os temporizadores e o tempo de vida (TTL) para variáveis de estado, mas os temporizadores oferecem um conjunto mais amplo de recursos do que o TTL.
O TTL evita informações de estado que não tenham sido atualizadas durante o período especificado pelo usuário. Isso permite que os usuários evitem o crescimento não verificado do estado e removam entradas de estado obsoletas. Como mapas e listas implementam TTL para cada valor, você pode implementar funções que considerem somente valores de estado que foram atualizados recentemente definindo TTL.
Os temporizadores permitem que você defina uma lógica personalizada além da expulsão estadual, incluindo a emissão de registros. Opcionalmente, o senhor pode usar temporizadores para limpar as informações de estado para um determinado valor de estado, com a flexibilidade adicional de emitir valores ou acionar outra lógica condicional com base no temporizador.