Ler e gravar arquivos XML
Visualização
Esse recurso está em Public Preview.
Linguagem de Marcação Extensível (XML) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ele define um conjunto de regras para serializar dados que vão desde documentos até estruturas de dados arbitrárias.
Databricks oferece suporte a XML para leitura e gravação com Apache Spark, incluindo inferência e evolução automática de esquema, configuração de tag de linha, validação XSD e expressões SQL como from_xml. O suporte XML nativo funciona com o Auto Loader, read_files e COPY INTO sem exigir JARs externos.
Pré-requisitos
Suporte para formato de arquivo XML exige Databricks Runtime 14.3 e acima.
Opções
Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar a fonte de dados XML. Para obter uma lista completa das opções suportadas, consulte DataFrameReader Opções XML e DataFrameWriter Opções XML.
Analisar registros XML
A especificação XML exige uma estrutura bem formada. No entanto, essa especificação não é mapeada imediatamente para um formato tabular. Você deve especificar a opção rowTag para indicar o elemento XML mapeado para DataFrame Row. O elemento rowTag se torna o struct de nível superior. Os elementos secundários de rowTag se tornam os campos do nível superior struct.
Você pode especificar o esquema para esse registro ou permitir que ele seja inferido automaticamente. Como o analisador examina apenas os elementos rowTag, o DTD e as entidades externas são filtradas.
Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes opções rowTag:
- Python
- Scala
xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)
Ler o arquivo XML com a opção rowTag como "reviews":
- Python
- Scala
- SQL
df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'reviews'
)
Saída:
root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)
+----------------------------------------------------------------------------------------+
|review |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+
Ler o arquivo XML com rowTag como "review":
- Python
- Scala
- SQL
df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:
val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review'
)
Saída:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)
+----+------+--------------------------------+------+
|_id |author|comment |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5 |
|r002|Bob |Great location, very comfortable|4 |
+----+------+--------------------------------+------+
Validar registros XML com XSD
Opcionalmente, é possível validar cada registro XML em nível de linha usando uma Definição de Esquema XML (XSD). O arquivo XSD é especificado na opção rowValidationXSDPath. Fora isso, o XSD não afeta o esquema fornecido ou inferido. Um registro que falha na validação é marcado como "corrompido" e tratado com base na opção de modo de tratamento de registros corrompidos descrita na seção de opções.
O senhor pode usar o site XSDToSchema para extrair um esquema Spark DataFrame de um arquivo XSD. Ele suporta apenas tipos simples, complexos e de sequência e suporta apenas a funcionalidade básica de XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="review">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="rating" type="xs:integer" />
<xs:element name="comment" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
A tabela a seguir mostra a conversão dos tipos de dados XSD em tipos de dados Spark:
Tipos de dados XSD | Tipos de dados do Spark |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Analisar XML aninhado
Os dados XML em uma coluna com valor de cadeia de caracteres em um site DataFrame existente podem ser analisados com schema_of_xml e from_xml que retornam o esquema e os resultados analisados como novas colunas struct. Os dados XML passados como argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.
esquema_de_xml
Utilize schema_of_xml para inferir o esquema Spark a partir de uma string XML. Encaminhar o resultado para from_xml para a análise das colunas XML.
Sintaxe : schema_of_xml(xmlStr [, options])
Argumento | Obrigatório | Descrição |
|---|---|---|
| Sim | Uma expressão string que especifica um único registro XML bem-formado. |
| Não | Um literal |
Retorna uma STRING contendo uma definição de um struct com n campos de strings onde os nomes das colunas são derivados dos nomes de elementos e atributos XML. Os valores dos campos contêm os tipos SQL derivados e formatados.
de_xml
Utilize from_xml para analisar uma coluna STRING contendo registros XML em uma estrutura. Forneça um esquema diretamente ou use a saída de schema_of_xml.
Sintaxe : from_xml(xmlStr, schema [, options])
Argumento | Obrigatório | Descrição |
|---|---|---|
| Sim | Uma expressão string que especifica um único registro XML bem-formado. |
| Sim | Uma expressão de string ou invocação da função |
| Não | Um literal |
Retorna uma estrutura com nomes de campos e tipos correspondentes à definição de esquema. O esquema deve ser definido como pares de nome de coluna e tipo de dados separados por vírgulas, conforme usado em, por exemplo, CREATE TABLE. A maioria das opções mostradas na seção Opções são aplicáveis, com as seguintes exceções:
rowTag: Como há somente um registro XML, a opçãorowTagnão é aplicável.mode(default:PERMISSIVE): Permite um modo para lidar com registros corrompidos durante a análise.PERMISSIVE: Quando encontra um registro corrompido, coloca as cadeias de caracteres malformadas em um campo configurado porcolumnNameOfCorruptRecord, e define os campos malformados comonull. Para manter os registros corrompidos, é possível definir um campo do tipo string chamadocolumnNameOfCorruptRecordem um esquema definido pelo usuário. Se um esquema não tiver o campo, ele eliminará os registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um campocolumnNameOfCorruptRecordem um esquema de saída.FAILFAST: lança uma exceção quando encontra registros corrompidos.
Exemplos
Para analisar uma coluna de strings XML, use schema_of_xml para inferir o esquema e, em seguida, passe-o para from_xml:
- Python
- Scala
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
"""
df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}
val xmlData = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>""".stripMargin
val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
Para analisar XML em linha em SQL:
SELECT from_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>',
schema_of_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>')
);
Converter entre estruturas XML e DataFrame
Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML para DataFrame e de DataFrame para dados XML. Observe que o tratamento de atributos pode ser desativado com a opção excludeAttribute.
Conversão de XML para DataFrame
Ao ler XML, o Databricks mapeia elementos e atributos XML para campos de DataFrame de acordo com as seguintes regras.
Atributos são convertidos em campos com o prefixo de cabeçalho attributePrefix.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
Isso produz o seguinte esquema:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Dados de caractere em um elemento que contenha atributo(s) ou elemento(s) filho(s) são analisados no campo valueTag. Se houver várias ocorrências de dados de caractere, o campo valueTag será convertido para o tipo array.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
Isso produz o seguinte esquema:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversão de DataFrame para XML
Ao gravar um DataFrame em XML, certas estruturas aninhadas exigem tratamento especial devido às diferenças entre os modelos de dados de DataFrame e XML.
Se um DataFrame contém um campo ArrayType cujo tipo de elemento também é ArrayType, escrevê-lo em XML produz um nível extra de aninhamento que não está presente ao fazer o round-trip de arquivos XML. Isso afeta apenas DataFrames originados fora do XML — a leitura e a gravação de arquivos XML preservam a estrutura original.
Por exemplo, um DataFrame com o seguinte esquema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e os seguintes dados:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produz a seguinte saída XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
O nome do elemento da matriz sem nome em DataFrame é especificado pela opção arrayElementName (padrão: item).
Habilitar a coluna de dados resgatados
A coluna de dados resgatados garante que não se percam dados durante o ETL. Captura todos os dados que não foram analisados porque um ou mais campos em um registro apresentam um dos seguintes problemas:
- Ausente do esquema informado.
- Não corresponde ao tipo de dados do esquema informado.
- Tem incompatibilidade de maiúsculas e minúsculas nos nomes de campos no esquema informado.
A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro.
Para ativar a coluna de dados resgatados, defina a opção rescuedDataColumn para um nome de coluna na leitura:
- Python
- Scala
- SQL
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
format => 'xml',
rowTag => 'review',
rescuedDataColumn => '_rescued_data'
)
Para remover o caminho do arquivo de origem da coluna de dados resgatados, defina:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
O analisador XML oferece suporte a três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usadas junto com rescuedDataColumn, as incompatibilidades de tipos de dados não fazem com que os registros sejam descartados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.
Inferir e evoluir esquema com o Auto Loader
Para obter uma discussão detalhada sobre esse tópico e as opções aplicáveis, consulte Configurar inferência e evolução de esquema no Auto Loader. O senhor pode configurar o Auto Loader para detectar automaticamente o esquema dos dados XML carregados, o que lhe permite inicializar tabelas sem declarar explicitamente o esquema de dados e desenvolver o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações do esquema ao longo do tempo.
Em default, a inferência de esquema Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipos. Para formatos que não codificam tipos de dados (JSON, CSV e XML), Auto Loader infere todas as colunas como strings, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de amostra. Para ativar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.
O Auto Loader detecta a adição de novas colunas à medida que processa seus dados. Quando o site Auto Loader detecta uma nova coluna, a transmissão é interrompida com um UnknownFieldException. Antes de a transmissão gerar esse erro, o site Auto Loader executa a inferência de esquema nas últimas microlotes de dados e atualiza o local do esquema com o esquema mais recente, mesclando novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. O Auto Loader oferece suporte a diferentes modos de evolução do esquema, que o senhor define na opção cloudFiles.schemaEvolutionMode.
É possível usar dicas de esquema para aplicar as informações de esquema que o senhor conhece e espera em um esquema inferido. Quando o senhor sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um double em vez de um inteiro), é possível fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL. Quando a coluna de dados resgatados está habilitada, os campos nomeados em um caso diferente do esquema são carregados na coluna _rescued_data. O senhor pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê os dados sem distinção entre maiúsculas e minúsculas.
Uso
Os exemplos a seguir usam o dataset Wanderbricks para demonstrar a leitura e gravação de arquivos XML usando a API do Spark DataFrame e SQL.
Leia e escreva XML
Utilize a API DataFrame para gravar avaliações do Wanderbricks em XML e lê-las de volta.
- Python
- Scala
- R
# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
.format("xml") \
.option("rootTag", "reviews") \
.option("rowTag", "review") \
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
# Read the XML file back
df_read = spark.read \
.format("xml") \
.option("rowTag", "review") \
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()
// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
.format("xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
// Read the XML file back
val dfRead = spark.read
.format("xml")
.option("rowTag", "review")
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
Você pode especificar manualmente o esquema ao ler os dados:
- Python
- Scala
- R
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("rating", "integer"),
structField("comment", "string"))
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
Leitura e gravação de XML com SQL
Utilize SQL DDL para criar uma tabela a partir de um arquivo XML. Databricks infere os tipos de coluna automaticamente.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;
Você também pode especificar nomes e tipos de colunas no DDL. Nesse caso, o esquema não é inferido automaticamente.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
Carregar XML usando COPY INTO
Use o COPY INTO para carregar arquivos XML do armazenamento em cloud em uma tabela Delta.
DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;
COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');
Leia XML com validação de linha
Use a opção rowValidationXSDPath para validar cada linha contra um esquema XSD durante a leitura.
- Python
- Scala
- SQL
df = (spark.read
.format("xml")
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()
val df = spark.read
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review',
rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)
Carregar XML com o Auto Loader
Use o Auto Loader para ingerir continuamente arquivos XML do armazenamento em cloud em uma tabela Delta com inferência e evolução automática de esquema.
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("reviews")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow())
.toTable("reviews")
Recurso adicional
- Ler e gravar dados XML usando a biblioteca
spark-xml: Para usuários que usaram anteriormente a biblioteca Spark XML de código aberto, consulte o guia de integração legado. - Ler e gravar arquivos JSON: Se seus dados são semiestruturados, mas não XML, o JSON oferece inferência de esquema semelhante e suporte a dados aninhados com um formato mais simples.