Pular para o conteúdo principal

Ler transmissão estruturada state information

O senhor pode usar DataFrame operações ou SQL table-value functions para consultar dados e metadados do estado da transmissão estruturada. Use essas funções para observar informações de estado para consultas de transmissão estruturada com estado, que podem ser úteis para monitoramento e depuração.

O senhor deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta de transmissão a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente de leitura aos dados e metadados de estado. O senhor só pode usar a semântica de leitura de lotes para consultar informações de estado.

nota

Não é possível consultar informações de estado para o pipeline declarativo LakeFlow Spark , tabelas de transmissão ou visualizações materializadas. Não é possível consultar informações de estado usando compute serverless ou compute configurada com o modo de acesso padrão.

Requisitos

  • Use uma das seguintes configurações do site compute:

    • Databricks Runtime 16.3 e acima em compute configurado com o modo de acesso padrão.
    • Databricks Runtime 14.3 LTS e acima em compute configurado com modo de acesso dedicado ou sem isolamento.
  • Acesso de leitura ao caminho do ponto de verificação usado pela consulta de transmissão.

Ler transmissão estructurada armazenamento do estado

O senhor pode ler as informações de armazenamento do estado para consultas de transmissão estruturada executadas em qualquer site compatível Databricks Runtime. Use a seguinte sintaxe:

Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))

Opções e esquema da API do leitor de estado

Para obter uma lista completa de opções de formato statestore , consulte armazenamento do estado.

Os dados de saída têm o seguinte esquema:

Coluna

Tipo

Descrição

key

Struct (outro tipo derivado do estado key)

O endereço key para um registro de operador com estado no ponto de verificação de estado.

value

Estrutura (outro tipo derivado do valor do estado)

O valor de um registro de operador com estado no ponto de verificação estadual.

partition_id

Integer

A partição do ponto de verificação de estado que contém o registro do operador com estado.

No Databricks Runtime 16.4 LTS e versões superiores, quando a opção readChangeFeed é definida como true, os dados de saída têm o seguinte esquema:

Coluna

Tipo

Descrição

batch_id

Long

O ID do lote ao qual a alteração de estado pertence.

change_type

String

O tipo de alteração aplicada pelos lotes: update para inserções e atualizações, delete para exclusões.

key

Struct (outro tipo derivado do estado key)

O endereço key para um registro de operador com estado no ponto de verificação de estado.

value

Estrutura (outro tipo derivado do valor do estado)

O valor para um registro de operador com estado no ponto de verificação de estado. null para registros onde change_type é delete.

partition_id

Integer

A partição do ponto de verificação de estado que contém o registro do operador com estado.

Veja read_statestore função com valor de tabela.

Leia as mudanças de estado da transmissão estruturada

Disponível no Databricks Runtime 16.4 LTS e versões superiores. Para ler como o estado muda entre microlotes em vez de visualizar o estado completo em um único microlote, defina readChangeFeed para true e especifique changeStartBatchId. Opcionalmente, especifique changeEndBatchId. Para uma lista completa de opções, veja armazenamento do estado.

Por exemplo, para ler as alterações de estado dos lotes 2 até os lotes confirmados mais recentes:

Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)

O esquema de saída inclui colunas adicionais batch_id e change_type . Para obter o esquema completo, consulte Opções e esquema da API do leitor de estado.

Ler metadados do estado da transmissão estruturada

Disponível no Databricks Runtime 14.3 LTS ou superior. Você pode ler informações de metadados estaduais para consultas de transmissão estruturada:

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

Os dados retornados têm o seguinte esquema:

Coluna

Tipo

Descrição

operatorId

Integer

O ID inteiro do operador de transmissão com estado.

operatorName

String

Nome do operador de transmissão com estado.

stateStoreName

String

Nome do armazenamento do estado do operador.

numPartitions

Integer

Número de partições do armazenamento do estado.

minBatchId

Long

O ID de lote mínimo disponível para consultar o estado.

maxBatchId

Long

O ID de lote máximo disponível para consultar o estado.

nota

Os valores de ID do lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de controle foi escrito. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.

Veja read_state_metadata função com valor de tabela.

Exemplo: Consultar um lado de uma transmissão-transmissão join

Use a seguinte sintaxe para consultar o lado esquerdo de uma transmissão-transmissão join:

Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))

Exemplo: Consultar o armazenamento do estado para transmissão com vários operadores de estado

Este exemplo usa o leitor de metadados de estado para reunir detalhes de metadados de uma consulta de transmissão com vários operadores de estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.

O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:

Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))

A tabela a seguir representa um exemplo de saída de metadados de armazenamento do estado:

ID do operador

Nome do operador

Nome da loja estadual

Partições numéricas

ID mínima do lote

ID máxima do lote

0

StateStore Save

padrão

200

0

13

1

Desduplicação dentro da marca d'água

padrão

200

0

13

Para obter resultados para o operador dedupeWithinWatermark, consulte o leitor de estado com a opção operatorId, como no exemplo a seguir:

Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))