Pular para o conteúdo principal

Ingerir dados como tipo de variante semiestruturada

info

Visualização

Esse recurso está em Public Preview.

Em Databricks Runtime 15.3 e acima, o senhor pode usar o tipo VARIANT para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para a ingestão de dados do armazenamento de objetos na nuvem usando Auto Loader e COPY INTO, registros de transmissão de Kafka e SQL comando para criar novas tabelas com dados de variantes ou inserir novos registros usando o tipo de variante. A tabela a seguir resume os formatos de arquivo suportados e o suporte à versão do Databricks Runtime:

Formato de arquivo

Versão suportada do Databricks Runtime

JSON

15.3 e acima

XML

16.4 e acima

CSV

16.4 e acima

Consulte Dados da variante de consulta.

Crie uma tabela com uma coluna variante.

VARIANT É um tipo SQL padrão no Databricks Runtime 15.3 e versões superiores, e é compatível com tabelas baseadas no Delta Lake. As tabelas gerenciadas no Databricks usam Delta Lake por default, então você pode criar uma tabela vazia com uma única coluna VARIANT usando a seguinte sintaxe.

SQL
CREATE TABLE table_name (variant_column VARIANT)

Alternativamente, você pode usar uma instrução CTAS para criar uma tabela com uma coluna variante. Use a função PARSE_JSON para analisar strings JSON ou a função FROM_XML para analisar strings XML. O exemplo a seguir cria uma tabela com duas colunas.

  • A coluna id é extraída das strings JSON como um tipo STRING .
  • variant_column Contém as strings JSON completas codificadas como um tipo VARIANT .
SQL
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
nota

Databricks recomenda extrair os campos consultados com frequência e armazená-los como colunas não variantes para acelerar as consultas e otimizar o armazenamento.

VARIANT As colunas não podem ser usadas para chave clustering , partições ou chave Z-order O tipo de dados VARIANT não pode ser usado para comparações, agrupamento, ordenação e operações de conjuntos. Para mais informações, consulte Limitações.

Inserir uso de dados parse_json

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você pode usar parse_json para inserir registros de strings JSON como VARIANT. Por exemplo, analise as strings JSON da coluna json_string e insira-as em table_name.

SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data

Inserir uso de dados from_xml

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você pode usar from_xml para inserir registros de strings XML como VARIANT. Por exemplo, analise as strings XML da coluna xml_string e insira-as em table_name.

SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data

Inserir uso de dados from_csv

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você pode usar from_csv para inserir registros de strings CSV como VARIANT. Por exemplo, analise os registros CSV da coluna csv_string e insira-os em table_name.

SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data

Ingira dados do armazenamento de objetos na nuvem como variante

O Auto Loader pode ser usado para carregar todos os dados das fontes de arquivos compatíveis como uma única coluna VARIANT em uma tabela de destino. Como o site VARIANT é flexível às alterações de esquema e tipo e mantém a sensibilidade a maiúsculas e minúsculas e os valores NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão, com as seguintes ressalvas:

  • Registros malformados não podem ser codificados usando o tipo VARIANT.
  • VARIANT O tipo só pode armazenar registros de até 16 MB.
nota

A variante trata registros muito grandes de forma semelhante aos registros corrompidos. No modo de processamento default PERMISSIVE , os registros excessivamente grandes são capturados no corruptRecordColumn.

Como todo o registro é gravado como uma única coluna VARIANT, não ocorre evolução do esquema durante a ingestão e não há suporte para rescuedDataColumn. O exemplo a seguir pressupõe que a tabela de destino já exista com uma única coluna VARIANT.

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

Você também pode especificar VARIANT ao definir um esquema ou passar schemaHints. Os dados no campo de origem referenciado devem conter um registro válido. Os exemplos a seguir demonstram essa sintaxe.

Python
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

Use COPY INTO com variante

A Databricks recomenda o uso do Auto Loader em vez de COPY INTO quando disponível.

COPY INTO suporta a ingestão de todo o conteúdo de uma fonte de dados suportada como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única coluna VARIANT e, em seguida, usa COPY INTO para ingerir registros de uma fonte de arquivo JSON.

SQL
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')

transmissão de dados Kafka como variante

Muitas transmissões Kafka codificam suas cargas úteis usando JSON. A ingestão da transmissão Kafka usando VARIANT torna essas cargas de trabalho robustas em relação às alterações de esquema.

O exemplo a seguir demonstra a leitura de uma fonte de transmissão Kafka, a conversão do key em STRING e do value em VARIANT e a gravação em uma tabela de destino.

Python
from pyspark.sql.functions import col, parse_json

(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

Próximos passos