Pular para o conteúdo principal

Ler transmissão estruturada state information

O senhor pode usar DataFrame operações ou SQL table-value functions para consultar dados e metadados do estado da transmissão estruturada. Use essas funções para observar informações de estado para consultas de transmissão estruturada com estado, que podem ser úteis para monitoramento e depuração.

O senhor deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta de transmissão a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente de leitura aos dados e metadados de estado. O senhor só pode usar a semântica de leitura de lotes para consultar informações de estado.

nota

O senhor não pode consultar informações de estado para pipeline DLT, tabelas de transmissão ou visualização materializada. O senhor não pode consultar informações de estado usando serverless compute ou compute configurado com o modo de acesso padrão.

Requisitos

  • Use uma das seguintes configurações do site compute:

    • Databricks Runtime 16.3 e acima em compute configurado com o modo de acesso padrão.
    • Databricks Runtime 14.3 LTS e acima em compute configurado com modo de acesso dedicado ou sem isolamento.
  • Acesso de leitura ao caminho do ponto de verificação usado pela consulta de transmissão.

Ler transmissão estructurada armazenamento do estado

O senhor pode ler as informações de armazenamento do estado para consultas de transmissão estruturada executadas em qualquer site compatível Databricks Runtime. Use a seguinte sintaxe:

Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))

Parâmetros da API do leitor de estado

A API do leitor de estado é compatível com as seguintes configurações opcionais:

Opção

Tipo

Valor padrão

Descrição

batchId

Long

última identificação de lotes

Representa os lotes de destino para leitura. Especifique essa opção para consultar as informações de estado de um estado anterior da consulta. Os lotes devem estar comprometidos, mas ainda não foram limpos.

operatorId

Long

0

Representa o operador de destino a partir do qual ler. Essa opção é usada quando a consulta está usando vários operadores com estado.

storeName

String

"default" (padrão)

Representa o nome do armazenamento do estado de destino para leitura. Essa opção é usada quando o operador stateful usa várias instâncias de armazenamento do estado. storeName ou joinSide devem ser especificados para uma transmissão a vapor join, mas não ambos.

joinSide

strings ("left" ou "right")

Representa o lado alvo a partir do qual ler. Essa opção é usada quando os usuários desejam ler o estado de uma transmissão-transmissão join.

stateVarName

String

Nenhuma

O nome da variável de estado a ser lida como parte dessa consulta. O nome da variável de estado é o nome exclusivo dado a cada variável na função init de um StatefulProcessor usado pelo operador transformWithState. Essa opção é obrigatória se o operador transformWithState for usado. Essa opção se aplica somente ao operador transformWithState e é ignorada para outros operadores. Disponível em Databricks Runtime 16.2 e acima.

readRegisteredTimers

Booleana

False

Defina como true para ler os temporizadores registrados usados no operador transformWithState. Essa opção se aplica somente ao operador transformWithState e é ignorada para outros operadores. Disponível em Databricks Runtime 16.2 e acima.

flattenCollectionTypes

Booleana

True

Se true, nivela os registros retornados para as variáveis de estado do mapa e da lista. Se for false, os registros serão retornados usando um Spark SQL Array ou Map. Essa opção se aplica somente ao operador transformWithState e é ignorada para outros operadores. Disponível em Databricks Runtime 16.2 e acima.

Os dados retornados têm o seguinte esquema:

Coluna

Tipo

Descrição

key

Struct (outro tipo derivado do estado key)

O endereço key para um registro de operador com estado no ponto de verificação de estado.

value

Estrutura (outro tipo derivado do valor do estado)

O valor de um registro de operador com estado no ponto de verificação estadual.

partition_id

Integer

A partição do ponto de verificação de estado que contém o registro do operador com estado.

Veja read_statestore função com valor de tabela.

Ler metadados do estado da transmissão estruturada

important

O senhor deve executar consultas de transmissão em Databricks Runtime 14.2 ou superior para registrar metadados de estado. Os arquivos de metadados de estado não quebram a compatibilidade com versões anteriores. Se o senhor optar por executar uma consulta de transmissão em Databricks Runtime 14.1 ou abaixo, os arquivos de metadados de estado existentes serão ignorados e nenhum novo arquivo de metadados de estado será gravado.

O senhor pode ler as informações de metadados de estado para a execução de consultas de transmissão estruturada em Databricks Runtime 14.2 ou superior. Use a seguinte sintaxe:

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

Os dados retornados têm o seguinte esquema:

Coluna

Tipo

Descrição

operatorId

Integer

O ID inteiro do operador de transmissão com estado.

operatorName

Integer

Nome do operador de transmissão com estado.

stateStoreName

String

Nome do armazenamento do estado do operador.

numPartitions

Integer

Número de partições do armazenamento do estado.

minBatchId

Long

O ID de lote mínimo disponível para consultar o estado.

maxBatchId

Long

O ID de lote máximo disponível para consultar o estado.

nota

Os valores de ID do lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de controle foi escrito. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.

Veja read_state_metadata função com valor de tabela.

Exemplo: Consultar um lado de uma transmissão-transmissão join

Use a seguinte sintaxe para consultar o lado esquerdo de uma transmissão-transmissão join:

Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))

Exemplo: Consultar o armazenamento do estado para transmissão com vários operadores de estado

Este exemplo usa o leitor de metadados de estado para reunir detalhes de metadados de uma consulta de transmissão com vários operadores de estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.

O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

A tabela a seguir representa um exemplo de saída de metadados de armazenamento do estado:

ID do operador

Nome do operador

Nome da loja estadual

Partições numéricas

ID mínima do lote

ID máxima do lote

0

StateStore Save

padrão

200

0

13

1

Desduplicação dentro da marca d'água

padrão

200

0

13

Para obter resultados para o operador dedupeWithinWatermark, consulte o leitor de estado com a opção operatorId, como no exemplo a seguir:

Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))