monitoramento transmitido query estruturada em Databricks

O Databricks fornece monitoramento integrado para aplicativos estruturados transmitidos por meio da Spark UI na guia transmissão .

Distinguir query estruturada transmitida na Spark UI

Forneça à sua transmissão um nome query exclusivo adicionando .queryName(<query-name>) ao seu código writeStream para distinguir facilmente quais métricas pertencem a qual transmissão na Spark UI.

Enviar métricas estruturadas de transmissão para serviços externos

As transmissões métricas podem ser enviadas para serviços externos para casos de uso de alertas ou painéis, usando a interface Query Listener de transmissão do site Apache Spark. Em Databricks Runtime 11.3 LTS e acima, o Query Listener de transmissão está disponível em Python e Scala.

Importante

Credenciais e objetos gerenciados pelo Unity Catalog não podem ser usados na lógica StreamingQueryListener .

Observação

A latência de processamento com ouvintes pode afetar significativamente as velocidades de processamento de consultas. É aconselhável limitar a lógica de processamento nesses ouvintes e optar por escrever em sistemas de resposta rápida, como o Kafka, para obter eficiência.

O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definindo métricas observáveis em transmissão estruturada

Métricas observáveis são funções agregadas nomeadas arbitrárias que podem ser definidas em uma query (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, finaliza uma query de lotes ou atinge uma época de transmissão), é emitido um evento nomeado que contém as métricas dos dados processados desde o último ponto de conclusão.

Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:

  • modo de lotes: Use QueryExecutionListener.

    QueryExecutionListener é chamado quando a query é concluída. Acesse as métricas usando o mapa QueryExecution.observedMetrics .

  • transmissão ou microbatch: Use StreamingQueryListener.

    StreamingQueryListener é chamado quando a query transmitida completa uma época. Acesse as métricas usando o mapa StreamingQueryProgress.observedMetrics . Databricks não suporta transmissão de execução contínua.

Por exemplo:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Análises do objeto StreamingQueryListener

Métrica

Descrição

id

Um ID de consulta exclusivo que persiste nas reinicializações.

runId

Um ID de consulta que é exclusivo para cada início/reinício. Consulte StreamingQuery.runId().

name

O nome da consulta especificado pelo usuário. O nome é nulo se nenhum nome for especificado.

timestamp

O registro de data e hora para a execução do microbatch.

batchId

Uma ID exclusiva para os lotes atuais de dados que estão sendo processados. No caso de novas tentativas após uma falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID do lote não é incrementado.

numInputRows

O número agregado (em todas as fontes) de registros processados em um acionador.

inputRowsPerSecond

A taxa agregada (em todas as fontes) de chegada de dados.

processedRowsPerSecond

A taxa agregada (em todas as fontes) na qual o Spark está processando dados.

objeto duraçãoMs

informações sobre o tempo necessário para concluir vários estágios do processo de execução do microbatch.

Métrica

Descrição

durationMs.addBatch

O tempo necessário para executar o microbatch. Isso exclui o tempo que o Spark leva para planejar o microbatch.

durationMs.getBatch

O tempo necessário para recuperar os metadados sobre os offsets da fonte.

durationMs.latestOffset

A última compensação consumida para o micro-lote. Esse objeto de progresso refere-se ao tempo necessário para recuperar o offset mais recente das fontes.

durationMs.queryPlanning

O tempo necessário para gerar o plano de execução.

durationMs.triggerExecution

O tempo necessário para planejar e executar o microbatch.

durationMs.walCommit

O tempo necessário para commit os novos offsets disponíveis.

objeto eventTime

informações sobre o valor de tempo do evento visto nos dados que estão sendo processados no microbatch. Esses dados são usados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas na transmissão estruturada Job.

Métrica

Descrição

eventTime.avg

O tempo médio do evento observado nesse acionador.

eventTime.max

O tempo máximo de evento observado nesse acionador.

eventTime.min

O tempo mínimo do evento visto nesse acionador.

eventTime.watermark

O valor da marca d'água usada nesse acionador.

objeto stateOperators

informações sobre as operações stateful que são definidas no Job de transmissão estruturada e as agregações que são produzidas a partir delas.

Métrica

Descrição

stateOperators.operatorName

O nome do operador com estado ao qual as métricas estão relacionadas, como symmetricHashJoin, dedupe, stateStoreSave.

stateOperators.numRowsTotal

O número total de linhas no estado como resultado de um operador ou agregação com estado.

stateOperators.numRowsUpdated

O número total de linhas atualizadas no estado como resultado de um operador ou agregação com estado.

stateOperators.allUpdatesTimeMs

Atualmente, essa métrica não é mensurável pelo site Spark e está planejada para ser removida em atualizações futuras.

stateOperators.numRowsRemoved

O número total de linhas removidas do estado como resultado de um operador ou agregação com estado.

stateOperators.allRemovalsTimeMs

Atualmente, essa métrica não é mensurável pelo site Spark e está planejada para ser removida em atualizações futuras.

stateOperators.commitTimeMs

O tempo necessário para commit todas as atualizações (colocações e remoções) e retornar uma nova versão.

stateOperators.memoryUsedBytes

Memória utilizada pelo armazenamento do estado.

stateOperators.numRowsDroppedByWatermark

O número de linhas que são consideradas muito atrasadas para serem incluídas em uma agregação com estado. Somente agregações de transmissão: O número de linhas descartadas após a agregação (não as linhas de entrada brutas). Esse número não é exato, mas fornece uma indicação de que há dados atrasados sendo descartados.

stateOperators.numShufflePartitions

O número de partições de embaralhamento para esse operador com estado.

stateOperators.numStateStoreInstances

A instância real de armazenamento do estado que o operador inicializou e manteve. Para muitos operadores com estado, isso é o mesmo que o número de partições. Entretanto, a junção transmissão-transmissão inicializa quatro instâncias de armazenamento do estado por partição.

stateOperators.customMetrics objeto

Informações coletadas de RocksDB capturando métricas sobre seu desempenho e operações com relação aos valores de estado que mantém para a transmissão estruturada Job. Para obter mais informações, consulte Configure RocksDB armazenamento do estado em Databricks.

Métrica

Descrição

customMetrics.rocksdbBytesCopied

O número de bytes copiados, conforme rastreado pelo Gerenciador de arquivos do RocksDB.

customMetrics.rocksdbCommitCheckpointLatency

O tempo, em milissegundos, para obter um Snapshot de RocksDB nativo e gravá-lo em um diretório local.

customMetrics.rocksdbCompactLatency

O tempo em milissegundos de compactação (opcional) durante o commit do ponto de verificação.

customMetrics.rocksdbCommitFileSyncLatencyMs

O tempo, em milissegundos, de sincronização do site nativo RocksDB Snapshot com o armazenamento externo (o local do ponto de verificação).

customMetrics.rocksdbCommitFlushLatency

O tempo, em milissegundos, de descarga das alterações do RocksDB na memória para o disco local.

customMetrics.rocksdbCommitPauseLatency

O tempo, em milissegundos, que interrompe os threads em segundo plano worker como parte do ponto de verificação commit, como, por exemplo, para compactação.

customMetrics.rocksdbCommitWriteBatchLatency

O tempo em milissegundos para aplicar as gravações em etapas na estrutura na memória (WriteBatch) ao RocksDB nativo.

customMetrics.rocksdbFilesCopied

O número de arquivos copiados, conforme monitorado pelo RocksDB File Manager.

customMetrics.rocksdbFilesReused

O número de arquivos reutilizados, conforme monitorado pelo Gerenciador de arquivos do RocksDB.

customMetrics.rocksdbGetCount

O número de chamadas get para o banco de dados (não inclui gets de WriteBatch - lotes na memória usados para preparar gravações).

customMetrics.rocksdbGetLatency

O tempo médio em nanossegundos para a chamada RocksDB::Get nativa subjacente.

customMetrics.rocksdbReadBlockCacheHitCount

A contagem de acessos ao cache do cache de blocos no RocksDB que são úteis para evitar leituras de disco local.

customMetrics.rocksdbReadBlockCacheMissCount

A contagem do cache de blocos no RocksDB não é útil para evitar leituras de disco local.

customMetrics.rocksdbSstFileSize

O tamanho de todo o arquivo Static Sorted Table (SST) - a estrutura tabular que o RocksDB usa para armazenar dados.

customMetrics.rocksdbTotalBytesRead

O número de bytes não compactados lidos pelas operações get.

customMetrics.rocksdbTotalBytesReadByCompaction

O número de bytes que o processo de compactação lê do disco.

customMetrics.rocksdbTotalBytesReadThroughIterator

O número total de bytes de dados não compactados lidos usando um iterador. Algumas operações com estado (por exemplo, processamento de tempo limite em FlatMapGroupsWithState e marca d'água) exigem a leitura de dados no BD por meio de um iterador.

customMetrics.rocksdbTotalBytesWritten

O número total de bytes não compactados gravados por operações put.

customMetrics.rocksdbTotalBytesWrittenByCompaction

O número total de bytes que o processo de compactação grava no disco.

customMetrics.rocksdbTotalCompactionLatencyMs

O tempo em milissegundos para as compactações do RocksDB, incluindo as compactações em segundo plano e a compactação opcional iniciada durante o commit.

customMetrics.rocksdbTotalFlushLatencyMs

O tempo total de descarga, incluindo a descarga em segundo plano. As operações de descarga são processos pelos quais o MemTable é descarregado no armazenamento quando está cheio. MemTables são o primeiro nível em que os dados são armazenados no RocksDB.

customMetrics.rocksdbZipFileBytesUncompressed

O tamanho em bytes dos arquivos zip não compactados, conforme relatado pelo Gerenciador de arquivos. O File Manager gerencia a utilização e a exclusão do espaço em disco do arquivo SST físico.

objeto de fontes (Kafka)

Métrica

Descrição

sources.description

Uma descrição detalhada da fonte do Kafka, especificando o tópico exato do Kafka que está sendo lido. Por exemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset objeto

O número de deslocamento inicial no tópico Kafka no qual a transmissão Job começará.

sources.endOffset objeto

O último offset processado pelo microbatch. Isso pode ser igual a latestOffset para uma execução de microbatch em andamento.

sources.latestOffset objeto

A última compensação calculada pelo microbatch. O processo de microbatching pode não processar todos os offsets quando há limitação, o que resulta em diferenças entre endOffset e latestOffset.

sources.numInputRows

O número de linhas de entrada processadas a partir dessa fonte.

sources.inputRowsPerSecond

A taxa na qual os dados estão chegando para processamento a partir dessa fonte.

sources.processedRowsPerSecond

A taxa na qual o Spark está processando dados dessa fonte.

objeto fontes.métricas (Kafka)

Métrica

Descrição

sources.metrics.avgOffsetsBehindLatest

O número médio de deslocamentos em que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.estimatedTotalBytesBehindLatest

O número estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos.

sources.metrics.maxOffsetsBehindLatest

O número máximo de deslocamentos que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

sources.metrics.minOffsetsBehindLatest

O número mínimo de deslocamentos em que a consulta de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos.

objeto coletor (Kafka)

Métrica

Descrição

sink.description

A descrição do sink Kafka para o qual a consulta de transmissão está sendo gravada, detalhando a implementação específica do sink Kafka que está sendo usada. Por exemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

O número de linhas que foram gravadas na tabela de saída ou no sink como parte do microbatch. Em algumas situações, esse valor pode ser "-1" e, em geral, pode ser interpretado como "desconhecido".

objeto de fontes (Delta Lake)

Métrica

Descrição

sources.description

A descrição da fonte da qual a consulta de transmissão está sendo lida. Por exemplo: “DeltaSource[table]”.

sources.[startOffset/endOffset].sourceVersion

A versão de serialização com a qual esse deslocamento é codificado.

sources.[startOffset/endOffset].reservoirId

O ID da tabela que está sendo lida. Isso é usado para detectar erros de configuração ao reiniciar uma consulta.

sources.[startOffset/endOffset].reservoirVersion

A versão da tabela que está sendo processada no momento.

sources.[startOffset/endOffset].index

O índice na sequência de AddFiles nessa versão. Isso é usado para dividir um commit grande em vários lotes. Esse índice é criado pela classificação em modificationTimestamp e path.

sources.[startOffset/endOffset].isStartingVersion

Identifica se o deslocamento atual marca o início de uma nova consulta de transmissão em vez do processamento de alterações que ocorreram depois que os dados iniciais foram processados. Ao iniciar uma nova consulta, todos os dados presentes na tabela no início são processados primeiro e, em seguida, todos os novos dados que chegarem.

sources.latestOffset

O último offset processado pela consulta de microbatch.

sources.numInputRows

O número de linhas de entrada processadas a partir dessa fonte.

sources.inputRowsPerSecond

A taxa na qual os dados estão chegando para processamento a partir dessa fonte.

sources.processedRowsPerSecond

A taxa na qual o Spark está processando dados dessa fonte.

sources.metrics.numBytesOutstanding

O tamanho combinado dos arquivos pendentes (arquivos monitorados pelo RocksDB). Esse é o backlog métricas para Delta e Auto Loader como fonte de transmissão.

sources.metrics.numFilesOutstanding

O número de arquivos pendentes a serem processados. Esse é o backlog métricas para Delta e Auto Loader como fonte de transmissão.

objeto coletor (Delta Lake)

Métrica

Descrição

sink.description

A descrição do Delta sink, detalhando a implementação específica do Delta sink que está sendo usada. Por exemplo: “DeltaSink[table]”.

sink.numOutputRows

O número de linhas é sempre "-1" porque o Spark não pode inferir linhas de saída para os coletores DSv1, que é a classificação do coletor Delta Lake.

objeto de fontes (Kinesis)

Métrica

Descrição

description

A descrição da fonte Kinesis, especificando a transmissão Kinesis exata da qual a consulta de transmissão está lendo. Por exemplo: “KinesisV2[stream]”.

objeto de métricas de origem (Kinesis)

Métrica

Descrição

avgMsBehindLatest

O número médio de milissegundos em que um consumidor está atrasado em relação ao início de uma transmissão.

maxMsBehindLatest

O número máximo de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão.

minMsBehindLatest

O número mínimo de milissegundos em que um consumidor está atrasado em relação ao início de uma transmissão.

totalPrefetchedBytes

O número de bytes restantes para processar. Esse é o backlog métricas para Kinesis como uma fonte.

Para obter mais informações, consulte Quais métricas o Kinesis reporta?.

pia (Kinesis)

Métrica

Descrição

sink.description

A descrição da implementação do sink do Kinesis que está sendo usada. Por exemplo: “KinesisV2[stream]”.

sink.numOutputRows

O número de linhas que foram gravadas na tabela de saída ou no sink como parte do microbatch.

Exemplos

Exemplo de evento StreamingQueryListener Kafka para Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Exemplo de evento StreamingQueryListener do Kinesis para o Delta Lake

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Exemplo de evento StreamingQueryListener do Kafka+Delta Lake para o Delta Lake

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Exemplo de fonte de taxa para evento Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}