Inferir e evoluir o esquema usando from_json
no pipeline declarativo LakeFlow
Visualização
Este recurso está em pré-visualização pública.
Este artigo descreve como inferir e evoluir o esquema de blobs JSON com a função SQL from_json
no pipeline declarativo LakeFlow .
Visão geral
A função SQL from_json
analisa uma coluna de strings JSON e retorna um valor struct. Quando usado fora do pipeline declarativo LakeFlow , você deve fornecer explicitamente o esquema do valor retornado usando o argumento schema
. Quando usado com o pipeline declarativo LakeFlow , você pode habilitar a inferência e a evolução do esquema, que gerenciam automaticamente o esquema do valor retornado. Esse recurso simplifica tanto a configuração inicial (especialmente quando o esquema é desconhecido) quanto as operações em andamento quando o esquema muda com frequência. Ele permite o processamento contínuo de blobs JSON arbitrários de fontes de transmissão de dados, como Auto Loader, Kafka ou Kinesis.
Especificamente, quando usado no pipeline declarativo LakeFlow , a inferência e evolução do esquema para a função SQL from_json
pode:
- Detectar novos campos em registros JSON de entrada (incluindo objetos JSON aninhados)
- Inferir os tipos de campo e mapeá-los para os tipos de dados Spark apropriados
- Evoluir automaticamente o esquema para acomodar novos campos
- Manipule automaticamente dados que não estejam em conformidade com o esquema atual
Sintaxe: Inferir e evoluir automaticamente o esquema
Se você usar from_json
com o pipeline declarativo LakeFlow , ele poderá inferir e evoluir o esquema automaticamente. Para habilitar isso, defina o esquema como NULL e especifique a opção schemaLocationKey
. Isso permite inferir e acompanhar o esquema.
- SQL
- Python
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Uma consulta pode ter várias expressões from_json
, mas cada expressão deve ter um schemaLocationKey
exclusivo. O schemaLocationKey
também deve ser exclusivo por pipeline.
- SQL
- Python
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintaxe: Esquema fixo
Se você quiser impor um esquema específico, poderá usar a seguinte sintaxe from_json
para analisar strings JSON usando esse esquema:
from_json(jsonStr, schema, [, options])
Essa sintaxe pode ser usada em qualquer ambiente Databricks , incluindo o pipeline declarativo LakeFlow . Mais informações estão disponíveis aqui.
Inferência de esquema
from_json
infere o esquema dos primeiros lotes de colunas de dados JSON e o indexa internamente por seu schemaLocationKey
(obrigatório).
Se as strings JSON forem um único objeto (por exemplo, {"id": 123, "name": "John"}
), from_json
infere um esquema do tipo STRUCT e adiciona um rescuedDataColumn
à lista de campos.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Entretanto, se as strings JSON tiverem uma matriz de nível superior (como ["id": 123, "name": "John"]
), então from_json
encapsulará a ARRAY em uma STRUCT. Essa abordagem permite resgatar dados incompatíveis com o esquema inferido. Você tem a opção de explodir os valores da matriz em linhas separadas posteriormente.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Substituir inferência de esquema usando dicas de esquema
Opcionalmente, você pode fornecer schemaHints
para influenciar como from_json
infere o tipo de uma coluna. Isso é útil quando você sabe que uma coluna é de um tipo de dado específico ou se quiser escolher um tipo de dado mais geral (por exemplo, um double em vez de um integer). Você pode fornecer um número arbitrário de dicas para tipos de dados de coluna usando a sintaxe de especificação de esquema SQL. A semântica para dicas de esquema é a mesma das dicas de esquema do Auto Loader. Por exemplo:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Quando as strings JSON contêm um ARRAY de nível superior, elas são encapsuladas em um STRUCT. Nesses casos, as dicas de esquema são aplicadas ao esquema ARRAY em vez do STRUCT encapsulado. Por exemplo, considere uma string JSON com uma matriz de nível superior como:
[{"id": 123, "name": "John"}]
O esquema ARRAY inferido é encapsulado em uma STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Para alterar o tipo de dados de id
, especifique a dica de esquema como strings element.id
. Para adicionar uma nova coluna do tipo DOUBLE, especifique element.new_col
DOUBLE. Devido a essas dicas, o esquema para o array JSON de nível superior se torna:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolua o esquema usando schemaEvolutionMode
from_json
detecta a adição de novas colunas à medida que processa seus dados. Quando from_json
detecta um novo campo, ele atualiza o esquema inferido com o esquema mais recente, mesclando novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Após a atualização do esquema, o pipeline é reiniciado automaticamente com o esquema atualizado.
from_json
suporta os seguintes modos para evolução do esquema, que você define usando a configuração opcional schemaEvolutionMode
. Esses modos são consistentes com o Auto Loader.
| Comportamento ao ler uma nova coluna |
---|---|
| A transmissão da falha. Novas colunas são adicionadas ao esquema. As colunas existentes não desenvolvem tipos de dados. |
| O esquema nunca evolui e a transmissão não falha devido a alterações no esquema. Todas as novas colunas são registradas na coluna de dados resgatada. |
| Falha na transmissão. transmissão não reinicia a menos que |
| Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção |
Por exemplo:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Coluna de dados resgatada
Uma coluna de dados resgatada é adicionada automaticamente ao seu esquema como _rescued_data
. Você pode renomear a coluna definindo a opção rescuedDataColumn
. Por exemplo:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Quando você escolhe usar a coluna de dados resgatada, todas as colunas que não correspondem ao esquema inferido são resgatadas em vez de descartadas. Isso pode acontecer devido a uma incompatibilidade de tipo de dados, uma coluna ausente no esquema ou uma diferença de maiúsculas e minúsculas no nome da coluna.
Lidar com registros corrompidos
Para armazenar registros malformados que não podem ser analisados, adicione uma coluna _corrupt_record
definindo dicas de esquema, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Para renomear a coluna de registro corrompida, defina a opção columnNameOfCorruptRecord
.
O analisador JSON suporta três modos para lidar com registros corrompidos:
Mode | Descrição |
---|---|
| Para registros corrompidos, coloca as strings malformadas em um campo configurado por |
| Ignora registros corrompidos. Quando você usa o modo |
| Lança uma exceção quando o analisador encontra registros corrompidos. Quando você usa o modo |
Consulte um campo na saída from_json
from_json
infere o esquema durante a execução do pipeline. Se uma consulta posterior fizer referência a um campo from_json
antes que a função from_json
seja executada com sucesso pelo menos uma vez, o campo não será resolvido e a consulta será ignorada. No exemplo a seguir, a análise da consulta da tabela Silver será ignorada até que a função from_json
na consulta Bronze tenha sido executada e inferida o esquema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Se a função from_json
e os campos que ela infere forem referenciados na mesma consulta, a análise poderá falhar, como no exemplo a seguir:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Você pode corrigir isso movendo a referência ao campo from_json
para uma consulta posterior (como o exemplo bronze/prata acima). Como alternativa, você pode especificar schemaHints
que contém os campos from_json
referenciados. Por exemplo:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exemplos: Inferir e evoluir automaticamente o esquema
Esta seção fornece código de exemplo para habilitar a inferência e evolução automáticas de esquema usando from_json
no pipeline declarativo LakeFlow .
Crie uma tabela de transmissão a partir do armazenamento de objetos cloud
O exemplo a seguir usa a sintaxe read_files
para criar uma tabela de transmissão a partir do armazenamento de objetos cloud .
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Crie uma tabela de transmissão do Kafka
O exemplo a seguir usa a sintaxe read_kafka
para criar uma tabela de transmissão do Kafka.
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exemplos: Esquema fixo
Para obter um exemplo de código usando from_json
com um esquema fixo, consulte a funçãofrom_json
.
Perguntas frequentes
Esta seção responde a perguntas frequentes sobre inferência de esquema e suporte à evolução na função from_json
.
Qual é a diferença entre from_json
e parse_json
?
A função parse_json
retorna um valor VARIANT
das strings JSON .
O VARIANT fornece uma maneira flexível e eficiente de armazenar dados semiestruturados. Isso contorna a inferência e a evolução do esquema ao eliminar completamente os tipos estritos. Entretanto, se você quiser impor um esquema no momento da gravação (por exemplo, porque você tem um esquema relativamente restrito), from_json
pode ser uma opção melhor.
A tabela a seguir descreve as diferenças entre from_json
e parse_json
:
Função | Casos de uso | Disponibilidade |
---|---|---|
| A evolução do esquema com
| Disponível com inferência de esquema e evolução apenas no pipeline declarativo LakeFlow |
| VARIANT é particularmente adequado para armazenar dados que não precisam ser esquematizados. Por exemplo:
| Disponível com e sem pipeline declarativo LakeFlow |
Posso usar from_json
sintaxe de inferência e evolução de esquema fora do pipeline declarativo LakeFlow ?
Não, você não pode usar a sintaxe de inferência e evolução de esquema from_json
fora do pipeline declarativo LakeFlow .
Como acesso o esquema inferido por from_json
?
veja o esquema da tabela de transmissão de destino.
Posso passar from_json
um esquema e também fazer evolução?
Não, você não pode passar from_json
um esquema e também fazer evolução. No entanto, você pode fornecer dicas de esquema para substituir alguns ou todos os campos inferidos por from_json
.
O que acontece com o esquema se a tabela for totalmente atualizada?
Os locais do esquema associados à tabela são limpos e o esquema é reinferido do zero.