Inferir e desenvolver o esquema usando from_json
em DLT
Visualização
Esse recurso está em Public Preview.
Este artigo descreve como inferir e desenvolver o esquema de JSON blobs com a função from_json
SQL em DLT.
Visão geral
A função from_json
SQL analisa uma coluna de cadeias de caracteres JSON e retorna um valor de estrutura. Quando usado fora da DLT, o senhor deve fornecer explicitamente o esquema do valor retornado usando o argumento schema
. Quando usado com DLT, o senhor pode ativar a inferência e a evolução do esquema, que gerenciam automaticamente o esquema do valor retornado. Esse recurso simplifica a configuração inicial (especialmente quando o esquema é desconhecido) e as operações contínuas quando o esquema é alterado com frequência. Ele permite o processamento contínuo de JSON blobs arbitrários de fontes de transmissão de dados, como Auto Loader, Kafka, ou Kinesis.
Especificamente, quando usado em DLT, a inferência e a evolução do esquema para a função from_json
SQL podem:
- Detectar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
- Inferir os tipos de campo e mapeá-los para os tipos de dados Spark apropriados
- Evolua automaticamente o esquema para acomodar novos campos
- Processar automaticamente dados que não estão em conformidade com o esquema atual
Sintaxe: inferir e evoluir automaticamente o esquema
Se o senhor usar o site from_json
com DLT, ele poderá inferir e desenvolver automaticamente o esquema. Para ativar isso, defina o esquema como NULL e especifique a opção schemaLocationKey
. Isso permite que ele deduza e acompanhe o esquema.
- SQL
- Python
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Uma consulta pode ter várias expressões from_json
, mas cada expressão deve ter um schemaLocationKey
exclusivo. O endereço schemaLocationKey
também deve ser exclusivo para cada pipeline.
- SQL
- Python
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintaxe: esquema fixo
Se, em vez disso, o senhor quiser impor um esquema específico, poderá usar a seguinte sintaxe from_json
para analisar as cadeias de caracteres JSON usando esse esquema:
from_json(jsonStr, schema, [, options])
Essa sintaxe pode ser usada em qualquer ambiente do Databricks, inclusive no DLT. Mais informações estão disponíveis aqui.
Inferência de esquema
from_json
infere o esquema a partir dos primeiros lotes de colunas de dados JSON e o indexa internamente por seu schemaLocationKey
(obrigatório).
Se as cadeias de caracteres JSON forem um único objeto (por exemplo, {"id": 123, "name": "John"}
), from_json
infere um esquema do tipo STRUCT e adiciona um rescuedDataColumn
à lista de campos.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Entretanto, se as cadeias de caracteres JSON tiverem uma matriz de nível superior (como ["id": 123, "name": "John"]
), então from_json
envolverá a ARRAY em um STRUCT. Essa abordagem permite resgatar dados que são incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas a jusante.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Substitua a inferência de esquema usando dicas de esquema
Opcionalmente, você pode fornecer schemaHints
para influenciar como from_json
infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro). O senhor pode fornecer um número arbitrário de dicas para tipos de dados de coluna usando a sintaxe de especificação de esquema SQL. A semântica das dicas de esquema é a mesma das dicas de esquema do Auto Loader. Por exemplo:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Quando a cadeia de caracteres JSON contém um ARRAY de nível superior, ela é envolvida em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma cadeia de caracteres JSON com uma matriz de nível superior, como a seguinte:
[{"id": 123, "name": "John"}]
O esquema ARRAY inferido é encapsulado em um STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Para alterar o tipo de dados de id
, especifique a dica de esquema como element.id
strings. Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col
DOUBLE. Devido a essas dicas, o esquema para a matriz JSON de nível superior se torna:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolua o esquema usando schemaEvolutionMode
from_json
detecta a adição de novas colunas à medida que processa seus dados. Quando o from_json
detecta um novo campo, ele atualiza o esquema inferido com o esquema mais recente mesclando novas colunas no final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.
from_json
suporta os seguintes modos de evolução do esquema, que o senhor define usando a configuração opcional schemaEvolutionMode
. Esses modos são consistentes com o Auto Loader.
| Comportamento ao ler uma nova coluna |
---|---|
| A transmissão da falha. Novas colunas são adicionadas ao esquema. As colunas existentes não desenvolvem tipos de dados. |
| O esquema nunca evolui e a transmissão não falha devido a alterações no esquema. Todas as novas colunas são registradas na coluna de dados resgatados. |
| A transmissão falha. A transmissão não é reiniciada, a menos que o site |
| Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção |
Por exemplo:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Coluna de dados resgatados
Uma coluna de dados resgatada é adicionada automaticamente ao seu esquema como _rescued_data
. Você pode renomear a coluna definindo a opção rescuedDataColumn
. Por exemplo:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Quando você opta por usar a coluna de dados resgatados, todas as colunas que não corresponderem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipos de dados, a uma coluna ausente no esquema ou a uma diferença entre maiúsculas e minúsculas no nome da coluna.
Gerenciar registros corrompidos
Para armazenar registros que estão malformados e não podem ser analisados, adicione uma coluna _corrupt_record
definindo dicas de esquema, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Para renomear a coluna de registro corrompido, defina a opção columnNameOfCorruptRecord
.
O analisador JSON é compatível com três modos de tratamento de registros corrompidos:
Mode | Descrição |
---|---|
| Para registros corrompidos, coloca as cadeias de caracteres malformadas em um campo configurado por |
| Ignora registros corrompidos. Quando você usa o modo |
| Lança uma exceção quando o analisador encontra registros corrompidos. Quando você usa o modo |
Consulte um campo na saída from_json
from_json
infere o esquema durante a execução do pipeline. Se uma consulta downstream se referir a um campo from_json
antes da função from_json
ter sido executada com êxito pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise da consulta da tabela prata será ignorada até que a função from_json
na consulta bronze tenha sido executada e inferido o esquema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Se a função from_json
e os campos que ela infere forem mencionados na mesma consulta, a análise poderá falhar, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
O senhor pode corrigir isso movendo a referência ao campo from_json
para uma consulta downstream (como o exemplo do bronze/prata acima). Como alternativa, você pode especificar schemaHints
que contém os referidos campos from_json
. Por exemplo:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exemplos: inferir e desenvolver automaticamente o esquema
Esta seção fornece um código de exemplo para permitir a inferência e a evolução automáticas do esquema usando from_json
na DLT.
Criar uma tabela de transmissão a partir do armazenamento de objetos na nuvem
O exemplo a seguir usa a sintaxe read_files
para criar uma tabela de transmissão a partir do armazenamento de objetos na nuvem.
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Criar uma tabela de transmissão a partir do Kafka
O exemplo a seguir usa a sintaxe read_kafka
para criar uma tabela de transmissão a partir de Kafka.
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exemplos: esquema fixo
Por exemplo, codifique usando from_json
com um esquema fixo, consulte a funçãofrom_json
.
Perguntas frequentes
Esta seção responde às perguntas mais frequentes sobre inferência de esquema e suporte à evolução na função from_json
.
Qual é a diferença entre from_json
e parse_json
?
A função parse_json
retorna um valor VARIANT
das cadeias de caracteres JSON.
A VARIANT fornece uma maneira flexível e eficiente de armazenar dados semiestruturados. Isso contorna a inferência e a evolução do esquema ao eliminar completamente os tipos estritos. No entanto, se você quiser impor um esquema no momento da gravação (por exemplo, porque você tem um esquema relativamente rígido), from_json
pode ser uma opção melhor.
A tabela a seguir descreve as diferenças entre from_json
e parse_json
:
Função | Casos de uso | Disponibilidade |
---|---|---|
| A evolução do esquema com
| Disponível com inferência de esquema & evolução somente em DLT |
| O VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:
| Disponível com e sem DLT |
Posso usar a inferência de esquema from_json
e a sintaxe de evolução fora da DLT?
Não, o senhor não pode usar a inferência de esquema from_json
e a sintaxe de evolução fora da DLT.
Como faço para acessar o esquema inferido por from_json
?
visualizar o esquema da tabela de transmissão de destino.
Posso passar um esquema para from_json
e também fazer a evolução?
Não, você não pode passar um esquema para from_json
e também fazer a evolução. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos por from_json
.
O que acontece com o esquema se a tabela for totalmente atualizada?
As localizações do esquema associadas à tabela são apagadas e o esquema é inferido novamente do zero.