Ler transmissão estruturada state information
O senhor pode usar DataFrame operações ou SQL table-value functions para consultar dados e metadados do estado da transmissão estruturada. Use essas funções para observar informações de estado para consultas de transmissão estruturada com estado, que podem ser úteis para monitoramento e depuração.
O senhor deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta de transmissão a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente de leitura aos dados e metadados de estado. O senhor só pode usar a semântica de leitura de lotes para consultar informações de estado.
O senhor não pode consultar informações de estado para pipeline DLT, tabelas de transmissão ou visualização materializada. O senhor não pode consultar informações de estado usando serverless compute ou compute configurado com o modo de acesso padrão.
Requisitos
-
Use uma das seguintes configurações do site compute:
- Databricks Runtime 16.3 e acima em compute configurado com o modo de acesso padrão.
- Databricks Runtime 14.3 LTS e acima em compute configurado com modo de acesso dedicado ou sem isolamento.
-
Acesso de leitura ao caminho do ponto de verificação usado pela consulta de transmissão.
Ler transmissão estructurada armazenamento do estado
O senhor pode ler as informações de armazenamento do estado para consultas de transmissão estruturada executadas em qualquer site compatível Databricks Runtime. Use a seguinte sintaxe:
- Python
- SQL
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SELECT * FROM read_statestore('/checkpoint/path')
Parâmetros da API do leitor de estado
A API do leitor de estado é compatível com as seguintes configurações opcionais:
Opção | Tipo | Valor padrão | Descrição |
---|---|---|---|
| Long | última identificação de lotes | Representa os lotes de destino para leitura. Especifique essa opção para consultar as informações de estado de um estado anterior da consulta. Os lotes devem estar comprometidos, mas ainda não foram limpos. |
| Long | 0 | Representa o operador de destino a partir do qual ler. Essa opção é usada quando a consulta está usando vários operadores com estado. |
| String | "default" (padrão) | Representa o nome do armazenamento do estado de destino para leitura. Essa opção é usada quando o operador stateful usa várias instâncias de armazenamento do estado. |
| strings ("left" ou "right") | Representa o lado alvo a partir do qual ler. Essa opção é usada quando os usuários desejam ler o estado de uma transmissão-transmissão join. | |
| String | Nenhuma | O nome da variável de estado a ser lida como parte dessa consulta. O nome da variável de estado é o nome exclusivo dado a cada variável na função |
| Booleana | False | Defina como |
| Booleana | True | Se |
Os dados retornados têm o seguinte esquema:
Coluna | Tipo | Descrição |
---|---|---|
| Struct (outro tipo derivado do estado key) | O endereço key para um registro de operador com estado no ponto de verificação de estado. |
| Estrutura (outro tipo derivado do valor do estado) | O valor de um registro de operador com estado no ponto de verificação estadual. |
| Integer | A partição do ponto de verificação de estado que contém o registro do operador com estado. |
Veja read_statestore
função com valor de tabela.
Ler metadados do estado da transmissão estruturada
O senhor deve executar consultas de transmissão em Databricks Runtime 14.2 ou superior para registrar metadados de estado. Os arquivos de metadados de estado não quebram a compatibilidade com versões anteriores. Se o senhor optar por executar uma consulta de transmissão em Databricks Runtime 14.1 ou abaixo, os arquivos de metadados de estado existentes serão ignorados e nenhum novo arquivo de metadados de estado será gravado.
O senhor pode ler as informações de metadados de estado para a execução de consultas de transmissão estruturada em Databricks Runtime 14.2 ou superior. Use a seguinte sintaxe:
- Python
- SQL
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SELECT * FROM read_state_metadata('/checkpoint/path')
Os dados retornados têm o seguinte esquema:
Coluna | Tipo | Descrição |
---|---|---|
| Integer | O ID inteiro do operador de transmissão com estado. |
| Integer | Nome do operador de transmissão com estado. |
| String | Nome do armazenamento do estado do operador. |
| Integer | Número de partições do armazenamento do estado. |
| Long | O ID de lote mínimo disponível para consultar o estado. |
| Long | O ID de lote máximo disponível para consultar o estado. |
Os valores de ID do lote fornecidos por minBatchId
e maxBatchId
refletem o estado no momento em que o ponto de controle foi escrito. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.
Veja read_state_metadata
função com valor de tabela.
Exemplo: Consultar um lado de uma transmissão-transmissão join
Use a seguinte sintaxe para consultar o lado esquerdo de uma transmissão-transmissão join:
- Python
- SQL
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemplo: Consultar o armazenamento do estado para transmissão com vários operadores de estado
Este exemplo usa o leitor de metadados de estado para reunir detalhes de metadados de uma consulta de transmissão com vários operadores de estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.
O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:
- Python
- SQL
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SELECT * FROM read_state_metadata('/checkpoint/path')
A tabela a seguir representa um exemplo de saída de metadados de armazenamento do estado:
ID do operador | Nome do operador | Nome da loja estadual | Partições numéricas | ID mínima do lote | ID máxima do lote |
---|---|---|---|---|---|
0 | StateStore Save | padrão | 200 | 0 | 13 |
1 | Desduplicação dentro da marca d'água | padrão | 200 | 0 | 13 |
Para obter resultados para o operador dedupeWithinWatermark
, consulte o leitor de estado com a opção operatorId
, como no exemplo a seguir:
- Python
- SQL
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);