monitoramento transmitido query estruturada em Databricks
O Databricks fornece monitoramento integrado para aplicativos estruturados transmitidos por meio da Spark UI na guia transmissão .
Distinguir query estruturada transmitida na Spark UI
Forneça à sua transmissão um nome query exclusivo adicionando .queryName(<query-name>)
ao seu código writeStream
para distinguir facilmente quais métricas pertencem a qual transmissão na Spark UI.
Enviar métricas estruturadas de transmissão para serviços externos
As transmissões métricas podem ser enviadas para serviços externos para casos de uso de alertas ou painéis, usando a interface Query Listener de transmissão do site Apache Spark. Em Databricks Runtime 11.3 LTS e acima, o Query Listener de transmissão está disponível em Python e Scala.
Importante
Credenciais e objetos gerenciados pelo Unity Catalog não podem ser usados na lógica StreamingQueryListener
.
Observação
A latência de processamento associada aos ouvintes pode afetar negativamente o processamento query . Databricks recomenda minimizar a lógica de processamento nesses ouvintes e gravar em coletores de baixa latência, como Kafka.
O código a seguir fornece exemplos básicos da sintaxe para implementar um ouvinte:
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definindo métricas observáveis em transmissão estruturada
Métricas observáveis são funções agregadas nomeadas arbitrárias que podem ser definidas em uma query (DataFrame). Assim que a execução de um DataFrame atinge um ponto de conclusão (ou seja, finaliza uma query de lotes ou atinge uma época de transmissão), é emitido um evento nomeado que contém as métricas dos dados processados desde o último ponto de conclusão.
Você pode observar essas métricas anexando um ouvinte à sessão do Spark. O ouvinte depende do modo de execução:
modo de lotes: Use
QueryExecutionListener
.QueryExecutionListener
é chamado quando a query é concluída. Acesse as métricas usando o mapaQueryExecution.observedMetrics
.transmissão, ou micro-lotes: Use
StreamingQueryListener
.StreamingQueryListener
é chamado quando a query transmitida completa uma época. Acesse as métricas usando o mapaStreamingQueryProgress.observedMetrics
. Databricks não suporta transmissão de execução contínua.
Por exemplo:
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Análises do objeto StreamingQueryListener
Métrica |
Descrição |
---|---|
|
ID query exclusivo que persiste durante as reinicializações. Consulte StreamingQuery.id(). |
|
ID query exclusivo para cada início ou reinicialização. Consulte StreamingQuery.runId(). |
|
Nome da query especificado pelo usuário. Nulo se não for especificado. |
|
Carimbo temporal de execução dos microlotes. |
|
ID exclusivo para os lotes atuais de dados que estão sendo processados. Observe que no caso de novas tentativas após falha, um determinado ID de lote pode ser executado mais de uma vez. Da mesma forma, quando não há dados a serem processados, o ID dos lotes não é incrementado. |
|
Número agregado (em todas as fontes) de registros processados em um gatilho. |
|
Taxa agregada (em todas as fontes) de dados recebidos. |
|
Taxa agregada (em todas as fontes) na qual o Spark está processando dados. |
objeto duraçãoMs
informação sobre o tempo necessário para completar as diversas etapas do processo de execução dos microlotes.
Métrica |
Descrição |
---|---|
|
Tempo necessário para executar o microlote. Isso exclui o tempo que o Spark leva para planejar o microlote. |
|
Tempo necessário para recuperar os metadados sobre as compensações da origem. |
|
Último deslocamento consumido para o microlote. Este objeto de progresso refere-se ao tempo necessário para recuperar o deslocamento mais recente das fontes. |
|
Tempo necessário para gerar o plano de execução. |
|
Tempo necessário para planejar e executar o microlote. |
|
Tempo necessário para commit as novas compensações disponíveis. |
objeto eventTime
informações sobre o valor do tempo do evento visualizado nos dados em processamento nos microlotes. Esses dados são usados pela marca d'água para descobrir como cortar o estado para processar agregações com estado definidas no Job de transmissão estruturada.
Métrica |
Descrição |
---|---|
|
Tempo médio do evento visto no gatilho. |
|
Tempo máximo do evento visto no gatilho. |
|
Tempo mínimo do evento visto no gatilho. |
|
Valor da marca d’água utilizada no trigger. |
objeto stateOperators
informações sobre as operações stateful que são definidas no Job de transmissão estruturada e as agregações que são produzidas a partir delas.
Métrica |
Descrição |
---|---|
|
Nome do operador com estado ao qual as métricas estão relacionadas. Por exemplo, |
|
Número de linhas no estado como resultado do operador ou agregação com estado. |
|
Número de linhas atualizadas no estado como resultado do operador ou agregação com estado. |
|
Número de linhas removidas do estado como resultado do operador ou agregação com estado. |
|
Tempo necessário para commit todas as atualizações (colocações e remoções) e retornar uma nova versão. |
|
Memória utilizada pelo armazenamento do estado. |
|
Número de linhas que são consideradas tarde demais para serem incluídas na agregação com estado. somente agregações de transmissão: número de linhas eliminadas após a agregação, e não linhas de entrada brutas. O número não é preciso, mas pode indicar que dados atrasados estão sendo descartados. |
|
Número de partições aleatórias para este operador com estado. |
|
Instância real de armazenamento do estado que o operador inicializou e manteve. Em muitos operadores stateful, este é o mesmo que o número de partições, mas transmissão-transmissão join inicializa quatro instâncias de armazenamento do estado por partição. |
stateOperators.customMetrics objeto
informações coletadas do RocksDB que capturam métricas sobre seu desempenho e operações com relação aos valores stateful que mantém para a transmissão estruturada Job. Para obter mais informações, consulte Configurar o armazenamento do estado do RocksDB no Databricks.
Métrica |
Descrição |
---|---|
|
Número de bytes copiados conforme rastreado pelo RocksDB File Manager. |
|
Tempo em milissegundos para tirar um Snapshot do RocksDB nativo e gravá-lo em um diretório local. |
|
Tempo em milissegundos para compactação (opcional) durante o commit do ponto de verificação. |
|
Tempo em milissegundos para sincronizar os arquivos nativos relacionados Snapshot do RocksDB com um armazenamento externo (local do ponto de verificação). |
|
Tempo em milissegundos para liberar as alterações na memória do RocksDB em seu disco local. |
|
Tempo em milissegundos para interromper os threads worker em segundo plano (por exemplo, para compactação) como parte do commit do ponto de verificação. |
|
Tempo em milissegundos para aplicar as gravações preparadas na estrutura da memória ( |
|
Número de arquivos copiados rastreados pelo RocksDB File Manager. |
|
Número de arquivos reutilizados conforme rastreado pelo RocksDB File Manager. |
|
Número de chamadas |
|
Tempo médio em nanossegundos para a chamada |
|
Quanto do cache de blocos no RocksDB é útil ou não e evita leituras de disco local. |
|
Quanto do cache de blocos no RocksDB é útil ou não e evita leituras de disco local. |
|
Tamanho de todos os arquivos SST. SST significa Static Sorted Table, que é a estrutura tabular que o RocksDB usa para armazenar dados. |
|
Número de bytes descompactados lidos por operações |
|
Número de bytes que o processo de compactação lê do disco. |
|
Algumas das operações com estado (por exemplo, processamento de tempo limite em |
|
Número de bytes descompactados gravados por operações |
|
Número de bytes que o processo de compactação grava no disco. |
|
Milissegundos de tempo para compactações RocksDB, incluindo compactações em segundo plano e a compactação opcional iniciada durante o commit. |
|
Tempo de descarga, incluindo descarga de fundo. As operações de liberação são processos pelos quais o MemTable é liberado para armazenamento quando estiver cheio. MemTables são o primeiro nível onde os dados são armazenados no RocksDB. |
|
RocksDB File Manager gerencia a utilização e exclusão do espaço em disco do arquivo SST físico. Estas métricas representam os arquivos zip descompactados em bytes conforme relatado pelo Gerenciador de Arquivos. |
objeto de fontes (Kafka)
Métrica |
Descrição |
---|---|
|
Nome da fonte de onde a query de transmissão está lendo. Por exemplo, |
|
Número de deslocamento inicial dentro do tópico Kafka em que o Job de transmissão começa. |
|
Último deslocamento processado pelo microlote. Isso pode ser igual a |
|
Último deslocamento calculado pelo microlote. Quando há limitação, o processo de microlotes pode não processar todas as compensações, fazendo com que |
|
Número de linhas de entrada processadas desta origem. |
|
Taxa na qual os dados chegam para processamento para esta origem. |
|
Taxa na qual o Spark está processando dados para esta fonte. |
objeto fontes.métricas (Kafka)
Métrica |
Descrição |
---|---|
|
Número médio de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos. |
|
Número estimado de bytes que o processo query não consumiu dos tópicos inscritos. |
|
Número máximo de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos. |
|
Número mínimo de deslocamentos que a query de transmissão está atrás do último deslocamento disponível entre todos os tópicos inscritos. |
objeto coletor (Kafka)
Métrica |
Descrição |
---|---|
|
Nome do coletor no qual a query de transmissão está sendo gravada. Por exemplo, |
|
Número de linhas que foram gravadas na tabela de saída ou coletor como parte do microlote. Para algumas situações, este valor pode ser “-1” e geralmente pode ser interpretado como “desconhecido”. |
objeto de fontes (Delta Lake)
Métrica |
Descrição |
---|---|
|
Nome da fonte de onde a query de transmissão está lendo. Por exemplo, |
|
Versão da serialização com a qual este deslocamento é codificado. |
|
ID da tabela da qual você está lendo. Isso é usado para detectar configurações incorretas ao reiniciar uma query. |
|
Versão da tabela que você está processando atualmente. |
|
Indexe na sequência de |
|
Se este deslocamento denota uma query que está iniciando em vez de processar alterações. Ao iniciar uma nova query, todos os dados presentes na tabela no início são processados, e em seguida os novos dados que chegaram. |
|
Último deslocamento processado pela query de microlote. |
|
Número de linhas de entrada processadas desta origem. |
|
Taxa na qual os dados chegam para processamento para esta origem. |
|
Taxa na qual o Spark está processando dados para esta fonte. |
|
Tamanho dos arquivos pendentes (arquivos rastreados pelo RocksDB) combinados. Esta é a documentação do backlog para Delta e Auto Loader como fonte de transmissão. |
|
Número de arquivos pendentes a serem processados. Esta é a documentação do backlog para Delta e Auto Loader como fonte de transmissão. |
objeto coletor (Delta Lake)
Métrica |
Descrição |
---|---|
|
Nome do coletor no qual a query de transmissão grava. Por exemplo, |
|
O número de linhas nestas análises é “-1” porque o Spark não pode inferir linhas de saída para coletores DSv1, que é a classificação para o coletor Delta Lake. |
objeto de fontes (Kinesis)
Métrica |
Descrição |
---|---|
|
Nome da fonte de onde a query de transmissão lê. Por exemplo, |
Para obter mais informações, consulte Quais métricas o Kinesis reporta?.
objeto de métricas de origem (Kinesis)
Métrica |
Descrição |
---|---|
|
Número médio de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão. |
|
Número máximo de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão. |
|
Número mínimo de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão. |
|
Número de bytes restantes para processamento. Esta é a lista de pendências do Kinesis como fonte. |
Exemplos
Exemplo de evento StreamingQueryListener Kafka para Kafka
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exemplo de evento Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exemplo de fonte de taxa para evento Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}