Ler e gravar arquivos XML

Visualização

Esse recurso está em visualização pública.

Este artigo descreve como ler e gravar arquivos XML.

Extensible Markup Language (XML) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ele define um conjunto de regras para serializar dados que variam de documentos a estruturas de dados arbitrárias.

O suporte nativo ao formato de arquivo XML permite a ingestão, a consulta e a análise de dados XML para processamento ou transmissão de lotes. Ele pode inferir e desenvolver automaticamente o esquema e os tipos de dados, suporta expressões SQL como from_xml e pode gerar documentos XML. Ele não requer jars externos e funciona perfeitamente com o Auto Loader, read_files e COPY INTO. Opcionalmente, é possível validar cada registro XML em nível de linha com base em uma definição de esquema XML (XSD).

Requisitos

Databricks Runtime 14.3 e acima

Analisar registros XML

A especificação XML exige uma estrutura bem formada. No entanto, essa especificação não é imediatamente mapeada para um formato tabular. O senhor deve especificar a opção rowTag para indicar o elemento XML que mapeia para um DataFrame Row. O elemento rowTag torna-se o struct de nível superior. Os elementos filhos de rowTag tornam-se os campos do nível superior struct.

O senhor pode especificar o esquema para esse registro ou permitir que ele seja inferido automaticamente. Como o analisador examina apenas os elementos rowTag, a DTD e as entidades externas são filtradas.

Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes opções rowTag:

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Ler o arquivo XML com a opção rowTag como "books":

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Saída:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Leia o arquivo XML com rowTag como "livro":

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Saída:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

fonte de dados options

As opções de fonte de dados para XML podem ser especificadas das seguintes maneiras:

Para obter uma lista de opções, consulte Opções do Auto Loader.

Suporte XSD

Opcionalmente, é possível validar cada registro XML em nível de linha por meio de uma definição de esquema XML (XSD). O arquivo XSD é especificado na opção rowValidationXSDPath. O XSD não afeta de outra forma o esquema fornecido ou inferido. Um registro que falha na validação é marcado como "corrompido" e tratado com base na opção de modo de tratamento de registro corrompido descrita na seção de opções.

O senhor pode usar XSDToSchema para extrair um esquema Spark DataFrame de um arquivo XSD. Ele é compatível apenas com tipos simples, complexos e sequenciais e suporta apenas a funcionalidade XSD básica.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

A tabela a seguir mostra a conversão dos tipos de dados XSD em tipos de dados Spark:

Tipos de dados XSD

Tipos de dados do Spark

boolean

BooleanType

decimal

DecimalType

unsignedLong

DecimalType(38, 0)

double

DoubleType

float

FloatType

byte

ByteType

short, unsignedByte

ShortType

integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort

IntegerType

long, unsignedInt

LongType

date

DateType

dateTime

TimestampType

Others

StringType

Analisar XML aninhado

Os dados XML em uma coluna com valor de cadeia de caracteres em um DataFrame existente podem ser analisados com schema_of_xml e from_xml, o que retorna o esquema e os resultados analisados como novas colunas struct. Os dados XML passados como argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.

schema_of_xml

Sintaxe

schema_of_xml(xmlStr [, options] )

Argumentos

  • xmlStr: Uma expressão de cadeia de caracteres que especifica um único registro XML bem formado.

  • options: Um literal MAP<STRING,STRING> opcional que especifica as diretivas.

Devolve

Uma cadeia de caracteres que contém uma definição de uma estrutura com n campos de cadeias de caracteres em que os nomes das colunas são derivados dos nomes de elementos e atributos XML. Os valores de campo contêm os tipos SQL formatados derivados.

from_xml

Sintaxe

from_xml(xmlStr, schema [, options])

Argumentos

  • xmlStr: Uma expressão de cadeia de caracteres que especifica um único registro XML bem formado.

  • schema: Uma expressão de cadeia de caracteres ou invocação da função schema_of_xml.

  • options: Um literal MAP<STRING,STRING> opcional que especifica as diretivas.

Devolve

Uma estrutura com nomes de campos e tipos que correspondem à definição do esquema. O esquema deve ser definido como pares de nomes de colunas e tipos de dados separados por vírgulas, como usado, por exemplo, em CREATE TABLE. A maioria das opções mostradas nas opções da fonte de dados é aplicável, com as seguintes exceções:

  • rowTag: Como há apenas um registro XML, a opção rowTag não é aplicável.

  • mode default: PERMISSIVE): Permite um modo de lidar com registros corrompidos durante a análise.

    • PERMISSIVE: Quando encontra um registro corrompido, coloca as cadeias de caracteres malformadas em um campo configurado por columnNameOfCorruptRecord e define os campos malformados como null. Para manter registros corrompidos, é possível definir um campo do tipo cadeia de caracteres chamado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, ele descartará registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um campo columnNameOfCorruptRecord em um esquema de saída.

    • FAILFAST: Lança uma exceção quando encontra registros corrompidos.

Conversão de estrutura

Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML para DataFrame e de DataFrame para dados XML. Observe que a manipulação de atributos pode ser desativada com a opção excludeAttribute.

Conversão de XML para DataFrame

Atributos: Os atributos são convertidos como campos com o prefixo de cabeçalho attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produz um esquema abaixo:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dados de caracteres em um elemento que contém atributo(s) ou elemento(s) filho(s): Esses dados são analisados no campo valueTag. Se houver várias ocorrências de dados de caracteres, o campo valueTag será convertido em um tipo array.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produz um esquema abaixo:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversão de DataFrame para XML

Elemento como uma matriz em uma matriz: A gravação de um arquivo XML de DataFrame com um campo ArrayType e seu elemento como ArrayType teria um campo aninhado adicional para o elemento. Isso não aconteceria na leitura e gravação de dados XML, mas na gravação de um DataFrame lido de outras fontes. Portanto, a viagem de ida e volta na leitura e gravação de arquivos XML tem a mesma estrutura, mas a gravação de um DataFrame lido de outras fontes pode ter uma estrutura diferente.

DataFrame com um esquema abaixo:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e com os dados abaixo:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produz um arquivo XML abaixo:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

O nome do elemento da matriz sem nome em DataFrame é especificado pela opção arrayElementName (default: item).

Coluna de dados resgatados

A coluna de dados resgatados garante que o senhor nunca perca ou perca dados durante a ETL. O senhor pode ativar a coluna de dados resgatados para capturar todos os dados que não foram analisados porque um ou mais campos em um registro apresentam um dos seguintes problemas:

  • Ausente no esquema fornecido

  • Não corresponde ao tipo de dados do esquema fornecido

  • Tem uma incompatibilidade de maiúsculas e minúsculas com os nomes de campo no esquema fornecido

A coluna de dados resgatada é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro. Para remover o caminho do arquivo de origem da coluna de dados resgatados, o senhor pode definir a seguinte configuração SQL:

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

O senhor pode ativar a coluna de dados resgatados definindo a opção rescuedDataColumn para um nome de coluna ao ler dados, como _rescued_data com spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

O analisador XML suporta três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usadas junto com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam descartados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.

Inferência de esquema e evolução no Auto Loader

Para obter uma discussão detalhada sobre esse tópico e as opções aplicáveis, consulte Configurar inferência e evolução de esquema no Auto Loader. O senhor pode configurar o Auto Loader para detectar automaticamente o esquema dos dados XML carregados, o que lhe permite inicializar tabelas sem declarar explicitamente o esquema de dados e desenvolver o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo.

Em default, a inferência de esquema do Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipos. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Auto Loader infere todas as colunas como strings, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de amostra. Para ativar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.

O Auto Loader detecta a adição de novas colunas à medida que processa seus dados. Quando o Auto Loader detecta uma nova coluna, a transmissão é interrompida com um UnknownFieldException. Antes que sua transmissão lance esse erro, o Auto Loader executa a inferência de esquema nos microlotes de dados mais recentes e atualiza o local do esquema com o esquema mais recente, mesclando novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. O Auto Loader oferece suporte a diferentes modos de evolução do esquema, que o senhor define na opção cloudFiles.schemaEvolutionMode.

É possível usar dicas de esquema para aplicar as informações de esquema que o senhor conhece e espera em um esquema inferido. Quando o senhor sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um double em vez de um inteiro), é possível fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação de esquema SQL. Quando a coluna de dados resgatados está ativada, os campos nomeados em um caso diferente daquele do esquema são carregados na coluna _rescued_data. O senhor pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê os dados sem distinção entre maiúsculas e minúsculas.

Exemplos

Os exemplos desta seção usam um arquivo XML disponível para download no Apache Spark GitHub repo.

Ler e gravar XML

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))
val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

O senhor pode especificar manualmente o esquema ao ler os dados:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API SQL

A fonte de dados XML pode inferir tipos de dados:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

O senhor também pode especificar nomes e tipos de colunas na DDL. Nesse caso, o esquema não é inferido automaticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carregar XML usando COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Ler XML com validação de linha

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()
val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analisar XML aninhado (from_xml e schema_of_xml)

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml e schema_of_xml com a API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Carregar XML com o Auto Loader

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()
val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()