Pular para o conteúdo principal

Inferir e desenvolver o esquema usando from_json em DLT

info

Visualização

Esse recurso está em Public Preview.

Este artigo descreve como inferir e desenvolver o esquema de JSON blobs com a função from_json SQL em DLT.

Visão geral

A função from_json SQL analisa uma coluna de cadeias de caracteres JSON e retorna um valor de estrutura. Quando usado fora da DLT, o senhor deve fornecer explicitamente o esquema do valor retornado usando o argumento schema. Quando usado com DLT, o senhor pode ativar a inferência e a evolução do esquema, que gerenciam automaticamente o esquema do valor retornado. Esse recurso simplifica a configuração inicial (especialmente quando o esquema é desconhecido) e as operações contínuas quando o esquema é alterado com frequência. Ele permite o processamento contínuo de JSON blobs arbitrários de fontes de transmissão de dados, como Auto Loader, Kafka, ou Kinesis.

Especificamente, quando usado em DLT, a inferência e a evolução do esquema para a função from_json SQL podem:

  • Detectar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
  • Inferir os tipos de campo e mapeá-los para os tipos de dados Spark apropriados
  • Evolua automaticamente o esquema para acomodar novos campos
  • Processar automaticamente dados que não estão em conformidade com o esquema atual

Sintaxe: inferir e evoluir automaticamente o esquema

Se o senhor usar o site from_json com DLT, ele poderá inferir e desenvolver automaticamente o esquema. Para ativar isso, defina o esquema como NULL e especifique a opção schemaLocationKey. Isso permite que ele deduza e acompanhe o esquema.

SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>[, otherOptions]))

Uma consulta pode ter várias expressões from_json, mas cada expressão deve ter um schemaLocationKey exclusivo. O endereço schemaLocationKey também deve ser exclusivo para cada pipeline.

SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Sintaxe: esquema fixo

Se, em vez disso, o senhor quiser impor um esquema específico, poderá usar a seguinte sintaxe from_json para analisar as cadeias de caracteres JSON usando esse esquema:

from_json(jsonStr, schema, [, options])

Essa sintaxe pode ser usada em qualquer ambiente do Databricks, inclusive no DLT. Mais informações estão disponíveis aqui.

Inferência de esquema

from_json infere o esquema a partir dos primeiros lotes de colunas de dados JSON e o indexa internamente por seu schemaLocationKey (obrigatório).

Se as cadeias de caracteres JSON forem um único objeto (por exemplo, {"id": 123, "name": "John"}), from_json infere um esquema do tipo STRUCT e adiciona um rescuedDataColumn à lista de campos.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Entretanto, se as cadeias de caracteres JSON tiverem uma matriz de nível superior (como ["id": 123, "name": "John"]), então from_json envolverá a ARRAY em um STRUCT. Essa abordagem permite resgatar dados que são incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas a jusante.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Substitua a inferência de esquema usando dicas de esquema

Opcionalmente, você pode fornecer schemaHints para influenciar como from_json infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro). O senhor pode fornecer um número arbitrário de dicas para tipos de dados de coluna usando a sintaxe de especificação de esquema SQL. A semântica das dicas de esquema é a mesma das dicas de esquema do Auto Loader. Por exemplo:

SQL
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Quando a cadeia de caracteres JSON contém um ARRAY de nível superior, ela é envolvida em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma cadeia de caracteres JSON com uma matriz de nível superior, como a seguinte:

[{"id": 123, "name": "John"}]

O esquema ARRAY inferido é encapsulado em um STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Para alterar o tipo de dados de id, especifique a dica de esquema como element.id strings. Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col DOUBLE. Devido a essas dicas, o esquema para a matriz JSON de nível superior se torna:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Evolua o esquema usando schemaEvolutionMode

from_json detecta a adição de novas colunas à medida que processa seus dados. Quando o from_json detecta um novo campo, ele atualiza o esquema inferido com o esquema mais recente mesclando novas colunas no final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.

from_json suporta os seguintes modos de evolução do esquema, que o senhor define usando a configuração opcional schemaEvolutionMode. Esses modos são consistentes com o Auto Loader.

schemaEvolutionMode

Comportamento ao ler uma nova coluna

addNewColumns (padrão)

A transmissão da falha. Novas colunas são adicionadas ao esquema. As colunas existentes não desenvolvem tipos de dados.

rescue

O esquema nunca evolui e a transmissão não falha devido a alterações no esquema. Todas as novas colunas são registradas na coluna de dados resgatados.

failOnNewColumns

A transmissão falha. A transmissão não é reiniciada, a menos que o site schemaHints seja atualizado ou que os dados ofensivos sejam removidos.

none

Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção rescuedDataColumn seja definida. A transmissão não falha devido a alterações de esquema.

Por exemplo:

SQL
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Coluna de dados resgatados

Uma coluna de dados resgatada é adicionada automaticamente ao seu esquema como _rescued_data. Você pode renomear a coluna definindo a opção rescuedDataColumn. Por exemplo:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Quando você opta por usar a coluna de dados resgatados, todas as colunas que não corresponderem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipos de dados, a uma coluna ausente no esquema ou a uma diferença entre maiúsculas e minúsculas no nome da coluna.

Gerenciar registros corrompidos

Para armazenar registros que estão malformados e não podem ser analisados, adicione uma coluna _corrupt_record definindo dicas de esquema, como no exemplo a seguir:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Para renomear a coluna de registro corrompido, defina a opção columnNameOfCorruptRecord.

O analisador JSON é compatível com três modos de tratamento de registros corrompidos:

Mode

Descrição

PERMISSIVE

Para registros corrompidos, coloca as cadeias de caracteres malformadas em um campo configurado por columnNameOfCorruptRecord e define os campos malformados como null. Para manter os registros corrompidos, é possível definir um campo do tipo string chamado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, os registros corrompidos serão eliminados durante a análise. Ao inferir um esquema, o analisador adiciona implicitamente um campo columnNameOfCorruptRecord no esquema de saída.

DROPMALFORMED

Ignora registros corrompidos.

Quando você usa o modo DROPMALFORMED com rescuedDataColumn, as incompatibilidades de tipos de dados não fazem com que os registros sejam eliminados. Somente registros corrompidos são descartados, como JSON incompleto ou malformado.

FAILFAST

Lança uma exceção quando o analisador encontra registros corrompidos.

Quando você usa o modo FAILFAST com rescuedDataColumn, as incompatibilidades de tipos de dados não geram um erro. Somente registros corrompidos geram erros, como JSON incompleto ou malformado.

Consulte um campo na saída from_json

from_json infere o esquema durante a execução do pipeline. Se uma consulta downstream se referir a um campo from_json antes da função from_json ter sido executada com êxito pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise da consulta da tabela prata será ignorada até que a função from_json na consulta bronze tenha sido executada e inferido o esquema.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze

Se a função from_json e os campos que ela infere forem mencionados na mesma consulta, a análise poderá falhar, como no exemplo a seguir:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

O senhor pode corrigir isso movendo a referência ao campo from_json para uma consulta downstream (como o exemplo do bronze/prata acima). Como alternativa, você pode especificar schemaHints que contém os referidos campos from_json. Por exemplo:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

Exemplos: inferir e desenvolver automaticamente o esquema

Esta seção fornece um código de exemplo para permitir a inferência e a evolução automáticas do esquema usando from_json na DLT.

Criar uma tabela de transmissão a partir do armazenamento de objetos na nuvem

O exemplo a seguir usa a sintaxe read_files para criar uma tabela de transmissão a partir do armazenamento de objetos na nuvem.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Criar uma tabela de transmissão a partir do Kafka

O exemplo a seguir usa a sintaxe read_kafka para criar uma tabela de transmissão a partir de Kafka.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)

Exemplos: esquema fixo

Por exemplo, codifique usando from_json com um esquema fixo, consulte a funçãofrom_json.

Perguntas frequentes

Esta seção responde às perguntas mais frequentes sobre inferência de esquema e suporte à evolução na função from_json.

Qual é a diferença entre from_json e parse_json?

A função parse_json retorna um valor VARIANT das cadeias de caracteres JSON.

A VARIANT fornece uma maneira flexível e eficiente de armazenar dados semiestruturados. Isso contorna a inferência e a evolução do esquema ao eliminar completamente os tipos estritos. No entanto, se você quiser impor um esquema no momento da gravação (por exemplo, porque você tem um esquema relativamente rígido), from_json pode ser uma opção melhor.

A tabela a seguir descreve as diferenças entre from_json e parse_json:

Função

Casos de uso

Disponibilidade

from_json

A evolução do esquema com from_json mantém o esquema. Isso é útil quando:

  • Você deseja aplicar seu esquema de dados (por exemplo, revisar cada alteração no esquema antes de persisti-la).
  • Você deseja otimizar o armazenamento e exigir baixa latência e custo de consulta.
  • Você quer falhar em dados com tipos incompatíveis.
  • O senhor deseja extrair resultados parciais de registros JSON corrompidos e armazenar o registro malformado na coluna _corrupt_record. Por outro lado, a ingestão de VARIANTs retorna um erro por JSON inválido.

Disponível com inferência de esquema & evolução somente em DLT

parse_json

O VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:

  • Você quer manter os dados semiestruturados porque eles são flexíveis.
  • O esquema muda muito rapidamente para ser convertido em um esquema sem falhas de transmissão e reinicializações frequentes.
  • Você não quer falhar em dados com tipos incompatíveis. (A ingestão de VARIANTs sempre será bem-sucedida para registros JSON válidos, mesmo que haja incompatibilidade de tipos).
  • Seus usuários não querem lidar com a coluna de dados resgatada contendo campos que não estão em conformidade com o esquema.

Disponível com e sem DLT

Posso usar a inferência de esquema from_json e a sintaxe de evolução fora da DLT?

Não, o senhor não pode usar a inferência de esquema from_json e a sintaxe de evolução fora da DLT.

Como faço para acessar o esquema inferido por from_json?

visualizar o esquema da tabela de transmissão de destino.

Posso passar um esquema para from_json e também fazer a evolução?

Não, você não pode passar um esquema para from_json e também fazer a evolução. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos por from_json.

O que acontece com o esquema se a tabela for totalmente atualizada?

As localizações do esquema associadas à tabela são apagadas e o esquema é inferido novamente do zero.