Pular para o conteúdo principal

Inferir e evoluir o esquema usando from_json no pipeline declarativo LakeFlow

info

Visualização

Este recurso está em pré-visualização pública.

Este artigo descreve como inferir e evoluir o esquema de blobs JSON com a função SQL from_json no pipeline declarativo LakeFlow .

Visão geral

A função SQL from_json analisa uma coluna de strings JSON e retorna um valor struct. Quando usado fora do pipeline declarativo LakeFlow , você deve fornecer explicitamente o esquema do valor retornado usando o argumento schema . Quando usado com o pipeline declarativo LakeFlow , você pode habilitar a inferência e a evolução do esquema, que gerenciam automaticamente o esquema do valor retornado. Esse recurso simplifica tanto a configuração inicial (especialmente quando o esquema é desconhecido) quanto as operações em andamento quando o esquema muda com frequência. Ele permite o processamento contínuo de blobs JSON arbitrários de fontes de transmissão de dados, como Auto Loader, Kafka ou Kinesis.

Especificamente, quando usado no pipeline declarativo LakeFlow , a inferência e evolução do esquema para a função SQL from_json pode:

  • Detectar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
  • Inferir os tipos de campo e mapeá-los para os tipos de dados Spark apropriados
  • Evoluir automaticamente o esquema para acomodar novos campos
  • Manipule automaticamente dados que não estejam em conformidade com o esquema atual

Sintaxe: Inferir e evoluir automaticamente o esquema

Se você usar from_json com o pipeline declarativo LakeFlow , ele poderá inferir e evoluir o esquema automaticamente. Para habilitar isso, defina o esquema como NULL e especifique a opção schemaLocationKey . Isso permite inferir e acompanhar o esquema.

SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>[, otherOptions]))

Uma consulta pode ter várias expressões from_json , mas cada expressão deve ter um schemaLocationKey exclusivo. O schemaLocationKey também deve ser exclusivo por pipeline.

SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Sintaxe: Esquema fixo

Se você quiser impor um esquema específico, poderá usar a seguinte sintaxe from_json para analisar strings JSON usando esse esquema:

from_json(jsonStr, schema, [, options])

Essa sintaxe pode ser usada em qualquer ambiente Databricks , incluindo o pipeline declarativo LakeFlow . Mais informações estão disponíveis aqui.

Inferência de esquema

from_json infere o esquema dos primeiros lotes de colunas de dados JSON e o indexa internamente por seu schemaLocationKey (obrigatório).

Se as strings JSON forem um único objeto (por exemplo, {"id": 123, "name": "John"}), from_json infere um esquema do tipo STRUCT e adiciona um rescuedDataColumn à lista de campos.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Entretanto, se as strings JSON tiverem uma matriz de nível superior (como ["id": 123, "name": "John"]), então from_json encapsulará a ARRAY em uma STRUCT. Essa abordagem permite resgatar dados incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas posteriormente.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Substituir inferência de esquema usando dicas de esquema

Opcionalmente, você pode fornecer schemaHints para influenciar como from_json infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dado específico ou se quiser escolher um tipo de dado mais geral (por exemplo, um double em vez de um integer). Você pode fornecer um número arbitrário de dicas para tipos de dados de coluna usando a sintaxe de especificação de esquema SQL. A semântica para dicas de esquema é a mesma das dicas de esquema do Auto Loader. Por exemplo:

SQL
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Quando as strings JSON contêm um ARRAY de nível superior, elas são encapsuladas em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma string JSON com uma matriz de nível superior como:

[{"id": 123, "name": "John"}]

O esquema ARRAY inferido é encapsulado em uma STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Para alterar o tipo de dados de id, especifique a dica de esquema como strings element.id . Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col DOUBLE. Devido a essas dicas, o esquema para o array JSON de nível superior se torna:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Evolua o esquema usando schemaEvolutionMode

from_json detecta a adição de novas colunas à medida que processa seus dados. Quando from_json detecta um novo campo, ele atualiza o esquema inferido com o esquema mais recente, mesclando novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.

from_json suporta os seguintes modos para evolução do esquema, que você define usando a configuração opcional schemaEvolutionMode . Esses modos são consistentes com o Auto Loader.

schemaEvolutionMode

Comportamento ao ler uma nova coluna

addNewColumns (padrão)

A transmissão da falha. Novas colunas são adicionadas ao esquema. As colunas existentes não desenvolvem tipos de dados.

rescue

O esquema nunca evolui e a transmissão não falha devido a alterações no esquema. Todas as novas colunas são registradas na coluna de dados resgatada.

failOnNewColumns

Falha na transmissão. transmissão não reinicia a menos que schemaHints seja atualizado ou os dados ofensivos sejam removidos.

none

Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção rescuedDataColumn seja definida. A transmissão não falha devido a alterações de esquema.

Por exemplo:

SQL
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Coluna de dados resgatada

Uma coluna de dados resgatada é adicionada automaticamente ao seu esquema como _rescued_data. Você pode renomear a coluna definindo a opção rescuedDataColumn . Por exemplo:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Quando você escolhe usar a coluna de dados resgatada, todas as colunas que não correspondem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipo de dados, uma coluna ausente no esquema ou uma diferença de maiúsculas e minúsculas no nome da coluna.

Lidar com registros corrompidos

Para armazenar registros malformados que não podem ser analisados, adicione uma coluna _corrupt_record definindo dicas de esquema, como no exemplo a seguir:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Para renomear a coluna de registro corrompida, defina a opção columnNameOfCorruptRecord .

O analisador JSON suporta três modos para lidar com registros corrompidos:

Mode

Descrição

PERMISSIVE

Para registros corrompidos, coloca as strings malformadas em um campo configurado por columnNameOfCorruptRecord e define os campos malformados como null. Para manter registros corrompidos, você pode definir um campo do tipo strings chamado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, registros corrompidos serão descartados durante a análise. Ao inferir um esquema, o analisador adiciona implicitamente um campo columnNameOfCorruptRecord no esquema de saída.

DROPMALFORMED

Ignora registros corrompidos.

Quando você usa o modo DROPMALFORMED com rescuedDataColumn, as incompatibilidades de tipos de dados não fazem com que os registros sejam descartados. Somente registros corrompidos são descartados, como JSON incompleto ou malformado.

FAILFAST

Lança uma exceção quando o analisador encontra registros corrompidos.

Quando você usa o modo FAILFAST com rescuedDataColumn, incompatibilidades de tipos de dados não geram erro. Somente registros corrompidos geram erros, como JSON incompleto ou malformado.

Consulte um campo na saída from_json

from_json infere o esquema durante a execução do pipeline. Se uma consulta posterior fizer referência a um campo from_json antes que a função from_json seja executada com sucesso pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise da consulta da tabela Silver será ignorada até que a função from_json na consulta Bronze tenha sido executada e inferida o esquema.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze

Se a função from_json e os campos que ela infere forem referenciados na mesma consulta, a análise poderá falhar, como no exemplo a seguir:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

Você pode corrigir isso movendo a referência ao campo from_json para uma consulta posterior (como o exemplo bronze/prata acima). Como alternativa, você pode especificar schemaHints que contém os campos from_json referenciados. Por exemplo:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

Exemplos: Inferir e evoluir automaticamente o esquema

Esta seção fornece código de exemplo para habilitar a inferência e evolução automáticas de esquema usando from_json no pipeline declarativo LakeFlow .

Crie uma tabela de transmissão a partir do armazenamento de objetos cloud

O exemplo a seguir usa a sintaxe read_files para criar uma tabela de transmissão a partir do armazenamento de objetos cloud .

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Crie uma tabela de transmissão do Kafka

O exemplo a seguir usa a sintaxe read_kafka para criar uma tabela de transmissão do Kafka.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)

Exemplos: Esquema fixo

Para obter um exemplo de código usando from_json com um esquema fixo, consulte a funçãofrom_json.

Perguntas frequentes

Esta seção responde a perguntas frequentes sobre inferência de esquema e suporte à evolução na função from_json .

Qual é a diferença entre from_json e parse_json?

A função parse_json retorna um valor VARIANT das strings JSON .

O VARIANT fornece uma maneira flexível e eficiente de armazenar dados semiestruturados. Isso contorna a inferência e a evolução do esquema ao eliminar completamente os tipos estritos. Entretanto, se você quiser impor um esquema no momento da gravação (por exemplo, porque você tem um esquema relativamente restrito), from_json pode ser uma opção melhor.

A tabela a seguir descreve as diferenças entre from_json e parse_json:

Função

Casos de uso

Disponibilidade

from_json

A evolução do esquema com from_json mantém o esquema. Isso é útil quando:

  • Você quer impor seu esquema de dados (por exemplo, revisando cada alteração de esquema antes de persisti-la).
  • Você quer otimizar o armazenamento e exige baixa latência de consulta e custo.
  • Você quer falhar em dados com tipos incompatíveis.
  • Você deseja extrair resultados parciais de registros JSON corrompidos e armazenar o registro malformado na coluna _corrupt_record . Em contraste, a ingestão de VARIANT retorna um erro para JSON inválido.

Disponível com inferência de esquema e evolução apenas no pipeline declarativo LakeFlow

parse_json

VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:

  • Você quer manter os dados semiestruturados porque eles são flexíveis.
  • O esquema muda muito rápido para ser convertido em um esquema sem falhas de transmissão e reinicializações frequentes.
  • Você não quer falhar em dados com tipos incompatíveis. (A ingestão de VARIANT sempre será bem-sucedida para registros JSON válidos, mesmo se houver incompatibilidades de tipo.)
  • Seus usuários não querem lidar com a coluna de dados resgatada contendo campos que não estão em conformidade com o esquema.

Disponível com e sem pipeline declarativo LakeFlow

Posso usar from_json sintaxe de inferência e evolução de esquema fora do pipeline declarativo LakeFlow ?

Não, você não pode usar a sintaxe de inferência e evolução de esquema from_json fora do pipeline declarativo LakeFlow .

Como acesso o esquema inferido por from_json?

veja o esquema da tabela de transmissão de destino.

Posso passar from_json um esquema e também fazer evolução?

Não, você não pode passar from_json um esquema e também fazer evolução. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos por from_json.

O que acontece com o esquema se a tabela for totalmente atualizada?

Os locais do esquema associados à tabela são limpos e o esquema é reinferido do zero.