Configurar RocksDB armazenamento do estado em Databricks
O senhor pode ativar o gerenciamento de estado baseado em RocksDBdefinindo a seguinte configuração em SparkSession antes de iniciar a consulta de transmissão.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
O senhor pode ativar o RocksDB no pipeline DLT. Consulte Otimizar a configuração do pipeline para processamento com estado.
Habilitar o checkpoint do registro de alterações
Em Databricks Runtime 13.3 LTS e acima, o senhor pode ativar o checkpointing do changelog para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda ativar o checkpointing do changelog para todas as consultas de transmissão estruturada com estado.
Tradicionalmente RocksDB armazenamento do estado Snapshot e upload de arquivos de dados durante o checkpointing. Para evitar esse custo, o changelog checkpoint grava somente os registros que foram alterados desde o último checkpoint em armazenamento durável.”
O checkpointing do changelog é desativado por default. O senhor pode ativar o checkpointing do changelog no nível do SparkSession usando a seguinte sintaxe:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
O senhor pode ativar o checkpointing do changelog em uma transmissão existente e manter as informações de estado armazenadas no checkpoint.
As consultas que habilitaram o checkpointing do changelog só podem ser executadas em Databricks Runtime 13.3 LTS e acima. O senhor pode desativar o checkpointing do changelog para reverter para o comportamento do checkpointing legado, mas deve continuar a executar essas consultas em Databricks Runtime 13.3 LTS ou acima. O senhor deve reiniciar o Job para que essas alterações ocorram.
RocksDB armazenamento do estado métricas
Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas em sua instância RocksDB para observar o armazenamento do estado e ajudar potencialmente na eliminação da lentidão do trabalho.
As métricas para uma instância específica de armazenamento do estado são rotuladas com o ID da partição e o nome da loja, garantindo que permaneçam separadas. Todas as outras métricas são agregadas (soma) por operadora estadual no trabalho em todas as tarefas em que a operadora estadual está operando.
Essas métricas fazem parte do mapa customMetrics
dentro dos campos stateOperators
em StreamingQueryProgress
. A seguir, um exemplo de StreamingQueryProgress
em formato JSON (obtido usando StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
As descrições detalhadas das métricas são as seguintes:
Nome da métrica | Descrição |
---|---|
Latência de lote de gravação do RocksDB Commits | Tempo (em milis) necessário para aplicar as gravações em etapas na estrutura na memória (WriteBatch) ao RocksDB nativo. |
Latência de descarga de compromisso do RocksDB | O tempo (em milis) necessário para liberar as alterações do RocksDB na memória para o disco local. |
RocksDB Commitem latência compacta | Tempo (em milis) gasto para compactação (opcional) durante o commit do ponto de verificação. |
Latência de pausa de compromisso do RocksDB | Tempo (em milis) necessário para interromper os threads em segundo plano worker (para compactação etc.) como parte do ponto de verificação commit. |
Latência do ponto de verificação do RocksDB Committ | Tempo (em milis) necessário para tirar um instantâneo do site nativo RocksDB e gravá-lo em um diretório local. |
Latências de sincronização de arquivos de confirmação do RocksDB | O tempo (em milis) necessário para sincronizar os arquivos nativos relacionados ao RocksDB Snapshot com um armazenamento externo (local do ponto de verificação). |
Latência RocksDB Get | Tempo médio (em nanos) gasto pela chamada |
Contagem de resultados do Rocks DB | Tempo médio (em nanos) gasto pela chamada |
RocksDB GetCount | Número de chamadas nativas para |
Contagem de resultados do Rocks DB | Número de chamadas nativas para |
Total de bytes do RockSDB lidos por GET | Número de bytes não compactados lidos por meio de chamadas |
RocksDB TotalBytes escritos por PUT | Número de bytes não compactados gravados por meio de chamadas |
Contagem de acertos do RocksDBreadBlockCache | Número de vezes que o cache de blocos nativo do RocksDB é usado para evitar a leitura de dados do disco local. |
Número de falhas no cache de blocos de pão do RocksD | Número de vezes que o cache de blocos nativo do RocksDB falhou e exigiu a leitura de dados do disco local. |
Total de bytes do Rocksdb lidos por compactação | Número de bytes lidos do disco local pelo processo de compactação nativo do RocksDB. |
Total de bytes do RocksDB escritos por compactação | Número de bytes gravados no disco local pelo processo de compactação nativo do RocksDB. |
Latências de compactação total do RocksDB | Tempo (em milis) necessário para as compactações do RocksDB (em segundo plano e a compactação opcional iniciada durante o commit). |
Escritores do RocksDB instalam nomes de latência | Tempo (em milis) de paralisação do gravador devido à compactação do fundo ou à descarga dos memtables no disco. |
Iterador de leitura total de bytes do RocksDB | Algumas das operações com estado (como o processamento de tempo limite em |