Pular para o conteúdo principal

Ler e gravar arquivos XML

info

Visualização

Esse recurso está em Public Preview.

Linguagem de Marcação Extensível (XML) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ele define um conjunto de regras para serializar dados que vão desde documentos até estruturas de dados arbitrárias.

Databricks oferece suporte a XML para leitura e gravação com Apache Spark, incluindo inferência e evolução automática de esquema, configuração de tag de linha, validação XSD e expressões SQL como from_xml. O suporte XML nativo funciona com o Auto Loader, read_files e COPY INTO sem exigir JARs externos.

Pré-requisitos

Suporte para formato de arquivo XML exige Databricks Runtime 14.3 e acima.

Opções

Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar a fonte de dados XML. Para obter uma lista completa das opções suportadas, consulte DataFrameReader Opções XML e DataFrameWriter Opções XML.

Analisar registros XML

A especificação XML exige uma estrutura bem formada. No entanto, essa especificação não é mapeada imediatamente para um formato tabular. Você deve especificar a opção rowTag para indicar o elemento XML mapeado para DataFrame Row. O elemento rowTag se torna o struct de nível superior. Os elementos secundários de rowTag se tornam os campos do nível superior struct.

Você pode especificar o esquema para esse registro ou permitir que ele seja inferido automaticamente. Como o analisador examina apenas os elementos rowTag, o DTD e as entidades externas são filtradas.

Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes opções rowTag:

Python
xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""

xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Ler o arquivo XML com a opção rowTag como "reviews":

Python
df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Saída:

root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)

+----------------------------------------------------------------------------------------+
|review |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+

Ler o arquivo XML com rowTag como "review":

Python
df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:

Saída:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)

+----+------+--------------------------------+------+
|_id |author|comment |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5 |
|r002|Bob |Great location, very comfortable|4 |
+----+------+--------------------------------+------+

Validar registros XML com XSD

Opcionalmente, é possível validar cada registro XML em nível de linha usando uma Definição de Esquema XML (XSD). O arquivo XSD é especificado na opção rowValidationXSDPath. Fora isso, o XSD não afeta o esquema fornecido ou inferido. Um registro que falha na validação é marcado como "corrompido" e tratado com base na opção de modo de tratamento de registros corrompidos descrita na seção de opções.

O senhor pode usar o site XSDToSchema para extrair um esquema Spark DataFrame de um arquivo XSD. Ele suporta apenas tipos simples, complexos e de sequência e suporta apenas a funcionalidade básica de XSD.

Scala
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="review">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="rating" type="xs:integer" />
<xs:element name="comment" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

A tabela a seguir mostra a conversão dos tipos de dados XSD em tipos de dados Spark:

Tipos de dados XSD

Tipos de dados do Spark

boolean

BooleanType

decimal

DecimalType

unsignedLong

DecimalType(38, 0)

double

DoubleType

float

FloatType

byte

ByteType

short, unsignedByte

ShortType

integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort

IntegerType

long, unsignedInt

LongType

date

DateType

dateTime

TimestampType

Others

StringType

Analisar XML aninhado

Os dados XML em uma coluna com valor de cadeia de caracteres em um site DataFrame existente podem ser analisados com schema_of_xml e from_xml que retornam o esquema e os resultados analisados como novas colunas struct. Os dados XML passados como argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.

esquema_de_xml

Utilize schema_of_xml para inferir o esquema Spark a partir de uma string XML. Encaminhar o resultado para from_xml para a análise das colunas XML.

Sintaxe : schema_of_xml(xmlStr [, options])

Argumento

Obrigatório

Descrição

xmlStr

Sim

Uma expressão string que especifica um único registro XML bem-formado.

options

Não

Um literal MAP<STRING,STRING> que especifica diretrizes.

Retorna uma STRING contendo uma definição de um struct com n campos de strings onde os nomes das colunas são derivados dos nomes de elementos e atributos XML. Os valores dos campos contêm os tipos SQL derivados e formatados.

de_xml

Utilize from_xml para analisar uma coluna STRING contendo registros XML em uma estrutura. Forneça um esquema diretamente ou use a saída de schema_of_xml.

Sintaxe : from_xml(xmlStr, schema [, options])

Argumento

Obrigatório

Descrição

xmlStr

Sim

Uma expressão string que especifica um único registro XML bem-formado.

schema

Sim

Uma expressão de string ou invocação da função schema_of_xml.

options

Não

Um literal MAP<STRING,STRING> que especifica diretrizes.

Retorna uma estrutura com nomes de campos e tipos correspondentes à definição de esquema. O esquema deve ser definido como pares de nome de coluna e tipo de dados separados por vírgulas, conforme usado em, por exemplo, CREATE TABLE. A maioria das opções mostradas na seção Opções são aplicáveis, com as seguintes exceções:

  • rowTag: Como há somente um registro XML, a opção rowTag não é aplicável.
  • mode (default: PERMISSIVE): Permite um modo para lidar com registros corrompidos durante a análise.
    • PERMISSIVE: Quando encontra um registro corrompido, coloca as cadeias de caracteres malformadas em um campo configurado por columnNameOfCorruptRecord, e define os campos malformados como null. Para manter os registros corrompidos, é possível definir um campo do tipo string chamado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, ele eliminará os registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um campo columnNameOfCorruptRecord em um esquema de saída.
    • FAILFAST: lança uma exceção quando encontra registros corrompidos.

Exemplos

Para analisar uma coluna de strings XML, use schema_of_xml para inferir o esquema e, em seguida, passe-o para from_xml:

Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
"""

df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Para analisar XML em linha em SQL:

SQL
SELECT from_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>',
schema_of_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>')
);

Converter entre estruturas XML e DataFrame

Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML para DataFrame e de DataFrame para dados XML. Observe que o tratamento de atributos pode ser desativado com a opção excludeAttribute.

Conversão de XML para DataFrame

Ao ler XML, o Databricks mapeia elementos e atributos XML para campos de DataFrame de acordo com as seguintes regras.

Atributos são convertidos em campos com o prefixo de cabeçalho attributePrefix.

XML
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>

Isso produz o seguinte esquema:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dados de caractere em um elemento que contenha atributo(s) ou elemento(s) filho(s) são analisados no campo valueTag. Se houver várias ocorrências de dados de caractere, o campo valueTag será convertido para o tipo array.

XML
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>

Isso produz o seguinte esquema:

root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversão de DataFrame para XML

Ao gravar um DataFrame em XML, certas estruturas aninhadas exigem tratamento especial devido às diferenças entre os modelos de dados de DataFrame e XML.

Se um DataFrame contém um campo ArrayType cujo tipo de elemento também é ArrayType, escrevê-lo em XML produz um nível extra de aninhamento que não está presente ao fazer o round-trip de arquivos XML. Isso afeta apenas DataFrames originados fora do XML — a leitura e a gravação de arquivos XML preservam a estrutura original.

Por exemplo, um DataFrame com o seguinte esquema:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e os seguintes dados:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produz a seguinte saída XML:

XML
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>

O nome do elemento da matriz sem nome em DataFrame é especificado pela opção arrayElementName (padrão: item).

Habilitar a coluna de dados resgatados

A coluna de dados resgatados garante que não se percam dados durante o ETL. Captura todos os dados que não foram analisados porque um ou mais campos em um registro apresentam um dos seguintes problemas:

  • Ausente do esquema informado.
  • Não corresponde ao tipo de dados do esquema informado.
  • Tem incompatibilidade de maiúsculas e minúsculas nos nomes de campos no esquema informado.

A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro.

Para ativar a coluna de dados resgatados, defina a opção rescuedDataColumn para um nome de coluna na leitura:

Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")

Para remover o caminho do arquivo de origem da coluna de dados resgatados, defina:

Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

O analisador XML oferece suporte a três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usadas junto com rescuedDataColumn, as incompatibilidades de tipos de dados não fazem com que os registros sejam descartados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.

Inferir e evoluir esquema com o Auto Loader

Para obter uma discussão detalhada sobre esse tópico e as opções aplicáveis, consulte Configurar inferência e evolução de esquema no Auto Loader. O senhor pode configurar o Auto Loader para detectar automaticamente o esquema dos dados XML carregados, o que lhe permite inicializar tabelas sem declarar explicitamente o esquema de dados e desenvolver o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações do esquema ao longo do tempo.

Em default, a inferência de esquema Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipos. Para formatos que não codificam tipos de dados (JSON, CSV e XML), Auto Loader infere todas as colunas como strings, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de amostra. Para ativar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.

O Auto Loader detecta a adição de novas colunas à medida que processa seus dados. Quando o site Auto Loader detecta uma nova coluna, a transmissão é interrompida com um UnknownFieldException. Antes de a transmissão gerar esse erro, o site Auto Loader executa a inferência de esquema nas últimas microlotes de dados e atualiza o local do esquema com o esquema mais recente, mesclando novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. O Auto Loader oferece suporte a diferentes modos de evolução do esquema, que o senhor define na opção cloudFiles.schemaEvolutionMode.

É possível usar dicas de esquema para aplicar as informações de esquema que o senhor conhece e espera em um esquema inferido. Quando o senhor sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um double em vez de um inteiro), é possível fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL. Quando a coluna de dados resgatados está habilitada, os campos nomeados em um caso diferente do esquema são carregados na coluna _rescued_data. O senhor pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê os dados sem distinção entre maiúsculas e minúsculas.

Uso

Os exemplos a seguir usam o dataset Wanderbricks para demonstrar a leitura e gravação de arquivos XML usando a API do Spark DataFrame e SQL.

Leia e escreva XML

Utilize a API DataFrame para gravar avaliações do Wanderbricks em XML e lê-las de volta.

Python
# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
.format("xml") \
.option("rootTag", "reviews") \
.option("rowTag", "review") \
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")

# Read the XML file back
df_read = spark.read \
.format("xml") \
.option("rowTag", "review") \
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()

Você pode especificar manualmente o esquema ao ler os dados:

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()

Leitura e gravação de XML com SQL

Utilize SQL DDL para criar uma tabela a partir de um arquivo XML. Databricks infere os tipos de coluna automaticamente.

SQL
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;

Você também pode especificar nomes e tipos de colunas no DDL. Nesse caso, o esquema não é inferido automaticamente.

SQL
DROP TABLE IF EXISTS reviews;

CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");

Carregar XML usando COPY INTO

Use o COPY INTO para carregar arquivos XML do armazenamento em cloud em uma tabela Delta.

SQL
DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;

COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');

Leia XML com validação de linha

Use a opção rowValidationXSDPath para validar cada linha contra um esquema XSD durante a leitura.

Python
df = (spark.read
.format("xml")
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()

Carregar XML com o Auto Loader

Use o Auto Loader para ingerir continuamente arquivos XML do armazenamento em cloud em uma tabela Delta com inferência e evolução automática de esquema.

Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("reviews")
)

Recurso adicional