Pular para o conteúdo principal

Configurar RocksDB armazenamento do estado em Databricks

O senhor pode ativar o gerenciamento de estado baseado em RocksDBdefinindo a seguinte configuração em SparkSession antes de iniciar a consulta de transmissão.

Scala
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

O senhor pode ativar o RocksDB no pipeline DLT. Consulte Otimizar a configuração do pipeline para processamento com estado.

Habilitar o checkpoint do registro de alterações

Em Databricks Runtime 13.3 LTS e acima, o senhor pode ativar o checkpointing do changelog para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda ativar o checkpointing do changelog para todas as consultas de transmissão estruturada com estado.

Tradicionalmente RocksDB armazenamento do estado Snapshot e upload de arquivos de dados durante o checkpointing. Para evitar esse custo, o changelog checkpoint grava somente os registros que foram alterados desde o último checkpoint em armazenamento durável.”

O checkpointing do changelog é desativado por default. O senhor pode ativar o checkpointing do changelog no nível do SparkSession usando a seguinte sintaxe:

Scala
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

O senhor pode ativar o checkpointing do changelog em uma transmissão existente e manter as informações de estado armazenadas no checkpoint.

important

As consultas que habilitaram o checkpointing do changelog só podem ser executadas em Databricks Runtime 13.3 LTS e acima. O senhor pode desativar o checkpointing do changelog para reverter para o comportamento do checkpointing legado, mas deve continuar a executar essas consultas em Databricks Runtime 13.3 LTS ou acima. O senhor deve reiniciar o Job para que essas alterações ocorram.

RocksDB armazenamento do estado métricas

Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado realizadas em sua instância RocksDB para observar o armazenamento do estado e ajudar potencialmente na eliminação da lentidão do trabalho.

As métricas para uma instância específica de armazenamento do estado são rotuladas com o ID da partição e o nome da loja, garantindo que permaneçam separadas. Todas as outras métricas são agregadas (soma) por operadora estadual no trabalho em todas as tarefas em que a operadora estadual está operando.

Essas métricas fazem parte do mapa customMetrics dentro dos campos stateOperators em StreamingQueryProgress. A seguir, um exemplo de StreamingQueryProgress em formato JSON (obtido usando StreamingQueryProgress.json()).

JSON
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}

As descrições detalhadas das métricas são as seguintes:

Nome da métrica

Descrição

Latência de lote de gravação do RocksDB Commits

Tempo (em milis) necessário para aplicar as gravações em etapas na estrutura na memória (WriteBatch) ao RocksDB nativo.

Latência de descarga de compromisso do RocksDB

O tempo (em milis) necessário para liberar as alterações do RocksDB na memória para o disco local.

RocksDB Commitem latência compacta

Tempo (em milis) gasto para compactação (opcional) durante o commit do ponto de verificação.

Latência de pausa de compromisso do RocksDB

Tempo (em milis) necessário para interromper os threads em segundo plano worker (para compactação etc.) como parte do ponto de verificação commit.

Latência do ponto de verificação do RocksDB Committ

Tempo (em milis) necessário para tirar um instantâneo do site nativo RocksDB e gravá-lo em um diretório local.

Latências de sincronização de arquivos de confirmação do RocksDB

O tempo (em milis) necessário para sincronizar os arquivos nativos relacionados ao RocksDB Snapshot com um armazenamento externo (local do ponto de verificação).

Latência RocksDB Get

Tempo médio (em nanos) gasto pela chamada RocksDB::Get nativa subjacente.

Contagem de resultados do Rocks DB

Tempo médio (em nanos) gasto pela chamada RocksDB::Put nativa subjacente.

RocksDB GetCount

Número de chamadas nativas para RocksDB::Get (não inclui Gets de WriteBatch - lotes na memória usados para gravação de preparação).

Contagem de resultados do Rocks DB

Número de chamadas nativas para RocksDB::Put (não inclui Puts para WriteBatch - lotes na memória usados para gravação de preparação).

Total de bytes do RockSDB lidos por GET

Número de bytes não compactados lidos por meio de chamadas RocksDB::Get nativas.

RocksDB TotalBytes escritos por PUT

Número de bytes não compactados gravados por meio de chamadas RocksDB::Put nativas.

Contagem de acertos do RocksDBreadBlockCache

Número de vezes que o cache de blocos nativo do RocksDB é usado para evitar a leitura de dados do disco local.

Número de falhas no cache de blocos de pão do RocksD

Número de vezes que o cache de blocos nativo do RocksDB falhou e exigiu a leitura de dados do disco local.

Total de bytes do Rocksdb lidos por compactação

Número de bytes lidos do disco local pelo processo de compactação nativo do RocksDB.

Total de bytes do RocksDB escritos por compactação

Número de bytes gravados no disco local pelo processo de compactação nativo do RocksDB.

Latências de compactação total do RocksDB

Tempo (em milis) necessário para as compactações do RocksDB (em segundo plano e a compactação opcional iniciada durante o commit).

Escritores do RocksDB instalam nomes de latência

Tempo (em milis) de paralisação do gravador devido à compactação do fundo ou à descarga dos memtables no disco.

Iterador de leitura total de bytes do RocksDB

Algumas das operações com estado (como o processamento de tempo limite em flatMapGroupsWithState ou a marca d'água em agregações com janelas) exigem a leitura de dados inteiros no banco de dados por meio do iterador. O tamanho total dos dados não compactados lidos usando o iterador.