Pular para o conteúdo principal

Crie um aplicativo personalizado com estado

Você pode usar transformWithState para criar aplicativos de transmissão com estado e para implementar soluções de baixa latência e quase em tempo real. Com operadores com estado personalizados, é possível criar lógica com estado arbitrária que possibilita o desenvolvimento de novos casos de uso operacionais não viáveis com o processamento tradicional de transmissão estructurada.

nota

Para operações com estado, como agregações, deduplicação e joins de transmissão, a Databricks recomenda usar operadores de Transmissão Estruturada integrados em vez de lógica personalizada. Consulte O que é transmissão com estado?

A Databricks recomenda o uso de transformWithState em vez de operadores legados, como flatMapGroupsWithState e mapGroupsWithState, para transformações de estado arbitrárias. Consulte Operadores arbitrários com estado (legado).

Requisitos

Os operadores transformWithState e transformWithStateInPandas têm os seguintes requisitos:

  • Disponível em Databricks Runtime 16.2 e acima.

    • Para o modo em tempo real, use Databricks Runtime 17.3 LTS ou acima. Consulte modo em tempo real em transmissão estructurada.
    • Para o modo de acesso padrão, Python está disponível no Databricks Runtime 16.3 e acima, e Scala está disponível no Databricks Runtime 17.3 e acima.
  • RocksDB é o provedor de armazenamento do estado default no Databricks Runtime 17.3 e acima.

    • Para Databricks Runtime 17.2 e abaixo, você deve configurar o provedor de armazenamento do estado RocksDB. A Databricks recomenda ativar RocksDB na configuração do Spark.

      Python
      spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

O que é transformWithState?

O operador transformWithState aplica um processador stateful personalizado a uma consulta de transmissão estruturada. Você deve implementar um processador com estado personalizado para usar transformWithState. A transmissão estruturada inclui o site APIs para criar seu processador stateful usando Python, Scala, ou Java.

Use transformWithState para aplicar lógica personalizada a uma key de agrupamento. O seguinte descreve o projeto de alto nível:

  • Defina uma ou mais variáveis de estado.
  • As informações de estado persistem para cada key de agrupamento. Você pode acessar cada variável de estado em código definido pelo usuário.
  • Para cada micro lote processado, todas as linhas para a key estão disponíveis como um iterador.
  • Use o(a) StatefulProcessorHandle com temporizadores e condições definidas pelo usuário para controlar como emitir linhas.
  • Para gerenciar a expiração do estado e o tamanho do estado, os valores de estado suportam definições individuais de time-to-live (TTL).

Como transformWithState oferece suporte à evolução do esquema no armazenamento de estado, você pode iterar e atualizar seus aplicativos de produção sem perder informações históricas de estado. Após a atualização do esquema de estado, não é necessário reprocessar as linhas, o que simplifica as implantações de código e a manutenção. Consulte evolução do esquema no armazenamento do estado.

importante

A documentação da Databricks usa transformWithState para descrever as implementações de Python e Scala:

  • O PySpark oferece suporte tanto à API transformWithState baseada em linhas quanto ao operador transformWithStateInPandas baseado em Pandas.

  • Scala suporta apenas a API transformWithState baseada em linhas.

As implementações de transformWithState em Scala e Python têm as mesmas capacidades, mas com algumas diferenças na sintaxe.

Definição de StatefulProcessor

Você define um processador com estado estendendo a classe StatefulProcessor e implementando seus métodos.

O Spark passa um StatefulProcessorHandle para o método init do seu StatefulProcessor. Utilize o identificador para criar variáveis de estado e interagir com o armazenamento do estado.

transformWithState suporta três tipos de estado: ValueState, ListStatee MapState. Cada tipo armazena o estado para cada chave de agrupamento utilizando uma estrutura de dados subjacente diferente.

Implemente os seguintes métodos para definir sua lógica personalizada:

  • Implemente handleInputRows para controlar como seu aplicativo processa dados, atualiza o estado e emite linhas para cada micro-batch. Consulte Lidar com as linhas de entrada.
  • Implementar handleExpiredTimer para execução de lógica baseada em tempo, independentemente de a key de agrupamento receber novas linhas em um microlotes. Consulte Lidar com timers expirados.
  • Opcionalmente, implemente handleInitialState para pré-preencher o estado antes que o aplicativo processe quaisquer linhas de entrada. Consulte Lidar com o estado inicial.

A tabela a seguir compara os comportamentos funcionais desses métodos.

Comportamento

handleInputRows

handleExpiredTimer

Obtenha, coloque, atualize ou limpe valores de estado

Sim

Sim

Criar ou excluir um cronômetro

Sim

Sim

Emitir linhas

Sim

Sim

Percorrer as linhas no micro-batch atual

Sim

Não

Lógica de gatilho com base no tempo decorrido

Não

Sim

Você pode combinar handleInputRows e handleExpiredTimer para implementar lógica complexa conforme necessário.

Por exemplo, você poderia implementar um aplicativo que usa handleInputRows para atualizar valores de estado para cada microlote e definir um temporizador de 10 segundos para o futuro. Se nenhuma linha adicional for processada, você pode usar handleExpiredTimer para emitir os valores atuais no armazenamento do estado. Se novas linhas forem processadas para a chave de agrupamento, você pode limpar o temporizador existente e definir um novo temporizador.

StatefulProcessorHandle

No PySpark, a classe StatefulProcessorHandle permite acessar funções que controlam como seu código usa informações de estado.

Ao inicializar um StatefulProcessor, deve-se sempre importar e passar o StatefulProcessorHandle para a variável handle. A variável handle vincula a variável local em sua classe Python à variável de estado.

nota

O Scala usa o método getHandle.

Tipos de estado personalizados

Pode-se implementar múltiplos objetos de estado em um único operador com estado.

Escolha um tipo de estado com base na lógica completa do seu aplicativo. Por exemplo, poderia rastrear sessões com um ValueState agrupadas por user_id e session_id. Ou, para avaliar condições em várias sessões, use um MapState agrupado por user_id com session_id como a chave do mapa.

Se o seu objeto de estado usa um StructType, deve-se definir nomes únicos para cada campo na estrutura para o esquema. Estes nomes são visíveis ao ler o armazenamento do estado. Consulte Ler informações de estado da transmissão estruturada.

As seções a seguir descrevem os tipos de estado compatíveis com transformWithState:

ValueState

ValueState armazena um valor para cada chave de agrupamento.

Um estado de valor pode incluir tipos complexos, como uma struct ou tupla. Para ValueState, você deve implementar lógica para substituir o valor inteiro.

O tempo de vida para um estado de valor é redefinido quando o valor é atualizado. Se uma chave de origem for processada para ValueState sem atualizar o ValueState armazenado, o tempo de vida não é redefinido.

ListState

ListState Armazena uma lista para cada chave de agrupamento.

Um estado de lista é uma coleção de valores, que pode incluir tipos complexos. Cada valor em uma lista tem seu próprio tempo de vida.

Você pode adicionar itens a uma lista anexando itens individuais, anexando uma lista de itens ou substituindo a lista inteira por um put. Para Reset o tempo de ativação, você deve usar uma operação de put.

MapState

MapState armazena um mapa para cada key de agrupamento. Mapas são o equivalente do Apache Spark a um dicionário Python (dict).

Um estado de mapa é uma coleção de chaves distintas, cada uma mapeada para um valor, e cada uma pode incluir tipos complexos. Cada par key-value em um mapa tem seu próprio tempo de vida.

É possível atualizar o valor de uma key específica ou remover uma key e seu valor. É possível retornar um valor individual usando sua key, listar todas as keys, listar todos os valores, ou retornar um iterador para trabalhar com o conjunto completo de pares key-value no mapa.

importante

Chaves de agrupamento descrevem os campos especificados na cláusula GROUP BY da consulta de transmissão estructurada. Estados de mapa podem conter um número arbitrário de pares key-value para uma key de agrupamento.

Por exemplo, se sua consulta usa GROUP BY user_id e você deseja definir um mapa para cada session_id, sua chave de agrupamento é user_id e a chave MapState é session_id:

Python
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())

def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []

def close(self) -> None:
pass

df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key

Criar uma variável de estado personalizada na StatefulProcessor

Ao inicializar seu StatefulProcessor, é criada uma variável local para cada objeto de estado, o que permite interagir com os objetos de estado em sua lógica personalizada. Definir e inicializar variáveis de estado substituindo o método integrado init na classe StatefulProcessor.

É possível definir qualquer número de objetos de estado usando os métodos getValueState, getListState e getMapState em seu StatefulProcessor.

Cada objeto de estado deve ter o seguinte:

  • Um nome exclusivo
  • Um esquema
    • Em Python, você deve especificar o esquema.
    • Em Scala, você pode passar um Encoder para especificar o esquema de estado.

Opcionalmente, você também pode fornecer uma duração de tempo de vida útil (TTL) em milissegundos. Se for implementar um estado de mapa, é necessário fornecer uma definição de esquema separada para as chaves do mapa e para os valores.

nota

StatefulProcessor gerencia a lógica separadamente para consulta, atualização e emissão de informação de estado. Veja Use suas variáveis de estado em métodos com lógica personalizada.

Use suas variáveis de estado em métodos com lógica personalizada

Objetos de estado possuem métodos para obter o estado, atualizar as informações de estado existentes e limpar o estado atual.

Cada key de agrupamento tem informações de estado dedicadas.

  • O StatefulProcessor emite linhas com base na sua lógica personalizada e no esquema de saída especificado. Consulte Emitir linhas.
  • Use o leitor statestore para acessar valores no armazenamento do estado. Este leitor é destinado a cargas de trabalho em lotes e não é destinado a cargas de trabalho de baixa latência. Consulte Ler informações de estado da transmissão estruturada.
  • A lógica especificada usando handleInputRows é executada somente se linhas para a key estiverem presentes em um microlote. Consulte Lidar com as linhas de entrada.
  • Use handleExpiredTimer para implementar uma lógica baseada em tempo que não dependa da observação de linhas para ser acionada. Consulte Lidar com timers expirados.
nota

Os objetos de estado são isolados por uma chave de agrupamento com as seguintes implicações:

  • Valores de estado não podem ser afetados por linhas associadas a uma chave de agrupamento diferente.
  • O senhor não pode implementar uma lógica que dependa da comparação de valores ou da atualização do estado entre chaves de agrupamento.

Você pode comparar valores dentro de uma chave de agrupamento. Use uma MapState para implementar a lógica com uma segunda key que sua lógica personalizada possa usar. Por exemplo, agrupar por user_id e usar ip_address para a sua chave MapState permite que você rastreie sessões de usuário simultâneas.

Considerações avançadas para trabalhar com o estado

As atualizações de estado são tolerantes a falhas. Se uma tarefa falhar antes que um microlote tenha terminado de ser processado, a nova tentativa utiliza o valor do último microlote bem-sucedido.

Para desempenho otimizado, o Databricks recomenda que você processe todos os valores no iterador para uma dada key e faça commit de atualizações em uma única gravação. Quando uma variável de estado é gravada, uma gravação no RocksDB é acionada.

Valores de estado não têm default. Se sua lógica exige leitura de informações de estado existentes, use o método exists.

Para implementar a lógica de estado nulo, as MapState variáveis permitem verificar keys individuais ou listar todas as keys.

Manipular linhas de entrada

Utilize o método handleInputRows para definir como seu aplicativo processa linhas e atualiza os valores de estado. Este método executa cada vez que sua consulta de transmissão estruturada processa linhas para uma key de agrupamento.

Para a maioria dos aplicativos com estado implementados com transformWithState, a lógica central é definida usando handleInputRows.

Para cada atualização de microlote processada, todas as linhas no microlote para uma determinada key de agrupamento estão disponíveis usando um iterador. A lógica definida pelo usuário pode interagir com todas as linhas do microbatch atual e os valores no statestore.

Lidar com cronômetros expirados

Use o método handleExpiredTimer para implementar lógica personalizada com base no tempo decorrido.

Em um agrupamento key, os temporizadores são identificados exclusivamente por seu carimbo de data/hora.

Quando um cronômetro expira, o resultado é determinado pela lógica implementada em seu aplicativo. Os padrões comuns incluem:

  • Emitir informações armazenadas em uma variável de estado.
  • Despejo de informações de estado armazenadas.
  • Criando um novo cronômetro.

Temporizadores expirados disparam mesmo que nenhuma linha para sua key associada seja processada em um micro-lote.

Especifique o modo de hora

Ao passar seu StatefulProcessor para transformWithState, você deve especificar o modo de tempo usando o parâmetro timeMode.

As seguintes opções são compatíveis:

Modo Tempo

Descrição

ProcessingTime

Há suporte para temporizadores e TTL e são avaliados com base no tempo de relógio real quando o Apache Spark processa cada micro-lote. Use ProcessingTime para que os temporizadores disparem em um intervalo fixo em relação ao processamento das linhas, independentemente dos carimbos de data/hora nos dados.

EventTime

Os temporizadores são compatíveis e são avaliados com base na marca d'água do tempo do evento. A marca d'água avança à medida que o Apache Spark observa os carimbos de data/hora nos dados de entrada. TTL não é compatível com EventTime. Use EventTime quando os dados contêm carimbos de data/hora e for necessário acionar os temporizadores com base no progresso desses carimbos de data/hora. Ao usar EventTime, você também deve especificar o parâmetro eventTimeColumnName. Consulte eventTimeColumnName.

NoTime ou TimeMode.None()

Temporizadores e TTL não são suportados. Use NoTime quando seu aplicativo com estado não requer lógica baseada em tempo.

eventTimeColumnName

Ao usar o modo de tempo EventTime, o parâmetro eventTimeColumnName especifica o nome da coluna em seu esquema de saída que contém o carimbo de data/hora do evento. O Apache Spark usa esta coluna para propagar a marca d'água para a transmissão de saída, possibilitando operações baseadas em tempo corretas a jusante.

eventTimeColumnName é um argumento adicional para transformWithState ou transformWithStateInPandas:

Python
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)

valores do temporizador integrado

A Databricks não recomenda invocar o relógio do sistema em seu aplicativo personalizado com estado, pois isso pode levar a novas tentativas não confiáveis em caso de falha de tarefa. Use os métodos na classe TimerValues quando precisar acessar o tempo de processamento ou a marca d'água:

TimerValues

Descrição

getCurrentProcessingTimeInMs

Retorna o registro de data e hora do tempo de processamento do lote atual em milissegundos desde a época.

getCurrentWatermarkInMs

Retorna o registro de data e hora da marca d'água para o lote atual em milissegundos desde a época.

nota

O tempo de processamento descreve o tempo em que os micro-lotes são processados pelo site Apache Spark. Muitas fontes de transmissão, como Kafka, também incluem o tempo de processamento do sistema.

As marcas d'água nas consultas de transmissão geralmente são definidas em relação ao tempo do evento ou ao tempo de processamento da fonte de transmissão. Consulte Aplicar marcas d'água para controlar o limite de processamento de dados.

Tanto as marcas d'água quanto as janelas podem ser usadas em combinação com transformWithState. Você pode implementar uma funcionalidade semelhante em seu aplicativo com estado personalizado aproveitando TTL, temporizadores e a funcionalidade MapState ou ListState.

Tempo de Vida Útil (TTL) para tipos de estado

Para evitar erros de falta de memória e para remover valores de tipo de estado obsoletos, transformWithState suporta um valor opcional de tempo de vida (TTL) para cada valor de tipo de estado. Após a expiração, o TTL descarta silenciosamente os valores do tipo de estado. TTL não executa handleExpiredTimer ou qualquer lógica personalizada. Para executar código quando o estado expirar, use um temporizador em vez disso.

importante

Caso o TTL não seja implementado, é necessário lidar com a remoção de estado para evitar erros de falta de memória.

Para todos os tipos de estado, o TTL é redefinido ao atualizar as informações de estado. O TTL é aplicado para cada valor de tipo de estado, com regras diferentes para cada tipo de estado:

  • As variáveis de estado têm escopo para a chave de agrupamento.

  • Para objetos ValueState, apenas um único valor é armazenado por agrupamento key. O TTL se aplica a esse valor.

  • Para objetos ListState, a lista pode conter muitos valores. O TTL se aplica a cada valor em uma lista de forma independente.

    • Embora o TTL seja delimitado a valores individuais em um(a) ListState, a única maneira de atualizar um valor individual é com o método put, que sobrescreve todo o conteúdo da variável ListState e redefine o TTL para todos os valores na lista.
  • Para objetos MapState, cada mapa key tem um valor de estado associado. O TTL se aplica independentemente a cada par key-valor em um mapa.

nota

Os temporizadores permitem definir lógica personalizada além da expulsão de estado, incluindo emitir linhas. Opcionalmente, é possível usar temporizadores tanto para limpar as informações de estado para um determinado valor de estado, quanto para emitir valores ou acionar lógica condicional. Consulte Lidar com temporizadores expirados.

Exemplo de aplicativo com estado

O exemplo a seguir define um processador com estado personalizado, SimpleCounterProcessor, incluindo variáveis de estado de exemplo. SimpleCounterProcessor utiliza ValueState, ListState e MapState para contar linhas para cada chave de agrupamento.

Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)

class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)

Para mais exemplos, consulte exemplos de aplicações com estado.

nota

No Python, os valores de estado são tuplas. Passe tuplas para put e update, e espere tuplas de get.

Por exemplo, se o esquema para seu ValueState for um único número inteiro:

Python
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple

Utilize esta abordagem para itens em um ListState ou valores em um MapState também.

Gerar linhas

É necessário usar handleInputRows ou handleExpiredTimer para definir como transformWithState emite linhas para cada chave de agrupamento. Consulte Trabalhar com linhas de entrada e Trabalhar com temporizadores expirados.

Aplicativos com estado personalizados não fazem suposições sobre como usar as informações de estado. Para uma determinada condição, o aplicativo poderia emitir nenhuma linha, uma linha, ou muitas linhas.

nota

É possível implementar múltiplos valores de estado e definir múltiplas condições para emitir linhas, mas todas as linhas devem usar o mesmo esquema.

Com transformWithStateInPandas, defina seu esquema de saída com a palavra-chave outputStructType.

Emitir linhas usando um objeto DataFrame do Pandas e yield.

Opcionalmente, você pode yield um DataFrame vazio. Se o modo de saída update for utilizado e um DataFrame vazio for emitido, isso atualiza os valores para a chave de agrupamento para serem null.

Lidar com o estado inicial

Opcionalmente, você pode passar um estado inicial para o primeiro micro-lote.

Por exemplo, talvez você possa usar isto para:

  • Migrar um fluxo de trabalho existente para um novo aplicativo personalizado.
  • Atualizar um operador com estado para alterar seu esquema ou lógica.
  • Reparar uma falha que não pode ser reparada automaticamente e requer intervenção manual.
nota

Use o leitor de armazenamento do estado para consultar as informações de estado de um ponto de controle existente. Consulte Ler informações sobre o estado da transmissão estruturada.

Se o senhor estiver convertendo uma tabela Delta existente em um aplicativo com estado, leia a tabela usando spark.read.table("table_name") e passe o DataFrame resultante. Opcionalmente, você pode selecionar ou modificar os campos para que estejam em conformidade com seu novo aplicativo com estado.

O senhor fornece um estado inicial usando um DataFrame com o mesmo esquema de agrupamento key que as linhas de entrada.

nota

O Python usa handleInitialState para especificar o estado inicial enquanto define um StatefulProcessor. O Scala usa a classe distinta StatefulProcessorWithInitialState.

O exemplo a seguir popula um contador por key de uma tabela Delta existente.

Python
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)

def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))

def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)

def close(self) -> None:
pass

output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])

# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")

q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)

Use transformWithState no pipeline declarativo LakeFlow Spark

Use o operador transformWithState dentro do pipeline declarativo LakeFlow Spark para implementar lógica com estado arbitrária em seu pipeline de transmissão usando Python.

Para isso, siga os passos abaixo:

  1. Defina o esquema de saída e a lógica do processador com estado para suas transformações arbitrárias com estado. Para exemplos, consulte Aplicativos de exemplo com estado.
  2. Crie um fluxo do Lakeflow Spark Declarative Pipelines que invoca o operador transformWithState em um DataFrame. Consulte Tutorial: Crie seu primeiro pipeline usando o Editor de Lakeflow Pipelines.
  3. Execute seu pipeline e valide os resultados na tabela de destino ou no coletor.

Para um exemplo que usa transformWithState para monitorar batimentos cardíacos do sensor, veja Exemplo: Usar transformWithState para monitorar batimentos cardíacos do sensor.