XML ファイルの読み取りと書き込み

プレビュー

この機能は パブリックプレビュー版です。

この記事では、XML ファイルの読み取りと書き込みの方法について説明します。

Extensible Markup Language (XML) は、データをテキスト形式で書式設定、保存、および共有するためのマークアップ言語です。 これは、ドキュメントから任意のデータ構造に至るまで、データをシリアル化するための一連のルールを定義します。

ネイティブ XML ファイル形式のサポートにより、バッチ処理またはストリーミング用の XML データの取り込み、クエリ、解析が可能になります。 スキーマとデータ型を自動的に推測して進化させ、 from_xmlなどの SQL 式をサポートし、XML ドキュメントを生成できます。 外部の jar は必要なく、Auto Loader、 read_filesCOPY INTOとシームレスに動作します。 オプションで、各行レベルの XML レコードを XML スキーマ定義 (XSD) に照らして検証できます。

要件

Databricks Runtime 14.3 以降

XML レコードの構文解析

XML 仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされるわけではありません。 rowTag オプションを指定して、DataFrame Rowにマップする XML 要素を指定する必要があります。rowTag要素が最上位のstructになります。rowTag の子要素は、最上位の structのフィールドになります。

このレコードのスキーマを指定することも、自動的に推論することもできます。 パーサーは rowTag 要素のみを調べるため、DTD と外部エンティティーは除外されます。

次の例は、さまざまな rowTag オプションを使用した XML ファイルのスキーマ推論と解析を示しています。

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

オプション rowTag "books" として XML ファイルを読み取ります。

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

アウトプット:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

rowTag "book" を含む XML ファイルを読み取ります。

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

アウトプット:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

データソースオプション

XML のデータ ソース オプションは、次の方法で指定できます。

  • 次の .option/.options 方法があります。

    • データフレームリーダー

    • データフレームライター

    • データストリームリーダー

    • データストリームライター

  • 次の組み込み関数:

  • CREATE TABLE USING DATA_SOURCEOPTIONS

オプションのリストについては、 Auto Loaderオプション」を参照してください。

XSD のサポート

オプションで、XML スキーマ定義 (XSD) によって各行レベルの XML レコードを検証できます。 XSD ファイルは rowValidationXSDPath オプションで指定します。 それ以外の場合、XSD は、提供または推論されるスキーマには影響しません。 検証に失敗したレコードは「破損」としてマークされ、オプションのセクションで説明されている破損レコード処理モードオプションに基づいて処理されます。

XSDToSchemaを使用して、XSD ファイルから Spark DataFrame スキーマを抽出できます。 単純型、複合型、およびシーケンス型のみをサポートし、基本的な XSD 機能のみをサポートします。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

次の表は、XSD データ型から Spark データ型への変換を示しています。

XSD データ型

Spark データ型

boolean

BooleanType

decimal

DecimalType

unsignedLong

DecimalType(38, 0)

double

DoubleType

float

FloatType

byte

ByteType

short, unsignedByte

ShortType

integernegativeIntegernonNegativeIntegernonPositiveIntegerpositiveIntegerunsignedShort

IntegerType

long, unsignedInt

LongType

date

DateType

dateTime

TimestampType

Others

StringType

入れ子になったXMLの解析

既存の DataFrame の文字列値列の XML データは、スキーマと解析結果を新しいstruct列として返すschema_of_xmlfrom_xmlを使用して解析できます。 引数として schema_of_xml および from_xml に渡される XML データは、1 つの整形式の XML レコードでなければなりません。

schema_of_xml

構文

schema_of_xml(xmlStr [, options] )

引数

  • xmlStr: 単一の整形式 XML レコードを指定する文字列式。

  • options: ディレクティブを指定するオプションの MAP<STRING,STRING> リテラル。

戻り値

n 個の文字列フィールドを持つ構造体の定義を保持する文字列。列名は XML 要素および属性名から派生します。 フィールド値には、派生したフォーマット済み SQL タイプが保持されます。

from_xml

構文

from_xml(xmlStr, schema [, options])

引数

  • xmlStr: 単一の整形式 XML レコードを指定する文字列式。

  • schema: 文字列式またはschema_of_xml関数の呼び出し。

  • options: ディレクティブを指定するオプションの MAP<STRING,STRING> リテラル。

戻り値

スキーマ定義に一致するフィールド名と型を持つ構造体。 スキーマは、たとえば CREATE TABLEで使用されるように、コンマで区切られた列名とデータ型のペアとして定義する必要があります。 データ ソース オプションに示されているほとんどのオプションは、次の例外を除いて適用できます。

  • rowTag: XML レコードが 1 つしかないため、 rowTag オプションは適用されません。

  • mode (デフォルト: PERMISSIVE ): 解析中に破損したレコードを処理するモードを許可します。

    • PERMISSIVE: 破損したレコードに遭遇すると、不正な形式の文字列をcolumnNameOfCorruptRecordで構成されたフィールドに挿入し、不正な形式のフィールドをnullに設定します。 破損したレコードを保持するには、ユーザー定義のスキーマにcolumnNameOfCorruptRecordという名前の文字列型フィールドを設定します。 スキーマにフィールドがない場合、解析中に破損したレコードがドロップされます。 スキーマを推論すると、出力スキーマに columnNameOfCorruptRecord フィールドが暗黙的に追加されます。

    • FAILFAST: 破損したレコードに出会うと例外をスローします。

構造変換

DataFrame と XML の構造の違いにより、XML データからDataFrameへ、およびDataFrameから XML データへの変換ルールがいくつかあります。 属性の処理は、オプション excludeAttributeで無効にできることに注意してください。

XML から DataFrame への変換

属性: 属性は、見出しの接頭辞が attributePrefixのフィールドとして変換されます。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

以下のスキーマを生成します。

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

属性または子要素を含む要素内の文字データ: これらは valueTag フィールドに解析されます。 文字データが複数ある場合は、 valueTag フィールドが array タイプに変換されます。

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

以下のスキーマを生成します。

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

DataFrame から XML への変換

配列内の配列としての要素: 要素が ArrayType としてフィールドArrayTypeを持つDataFrameから XML ファイルを書き込むと、その要素にネストされたフィールドが追加されます。これは、XML データの読み取りおよび書き込みでは発生しませんが、他のソースから読み取ったDataFrameを書き込む場合には発生しません。 したがって、XML ファイルの読み取りと書き込みのラウンドトリップは同じ構造になりますが、他のソースから読み取ったDataFrameの書き込みは異なる構造になる可能性があります。

以下のスキーマを含む DataFrame:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

そして、以下のデータで:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

以下の XML ファイルを生成します。

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

DataFrame内の名前のない配列の要素名は、オプションarrayElementNameで指定されます (デフォルト: item )。

レスキューされたデータ列

レスキューされたデータ列により、ETL 中にデータが失われたり見逃したりすることがなくなります。 レスキューされたデータ列を有効にして、レコード内の 1 つ以上のフィールドに次のいずれかの問題があるために解析されなかったデータをキャプチャできます。

  • 指定されたスキーマに存在しない

  • 指定されたスキーマのデータ型と一致しません

  • 指定されたスキーマのフィールド名と大文字と小文字が一致しません

レスキューされたデータ列は、レスキューされた列とレコードのソース ファイル パスを含む JSON ドキュメントとして返されます。 レスキューされたデータ列からソース ファイル パスを削除するには、次の SQL 構成を設定します。

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

復旧されたデータ列を有効にするには、データを読み取るときにオプション rescuedDataColumn を列名に設定します ( _rescued_data with spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)など)。

XML パーサーは、レコードを解析するときに、 PERMISSIVEDROPMALFORMEDFAILFASTの 3 つのモードをサポートします。 rescuedDataColumnと一緒に使用すると、データ型の不一致によって、DROPMALFORMEDモードでレコードがドロップされたり、FAILFASTモードでエラーがスローされたりすることはありません。破損したレコード (不完全な XML または不正な形式の XML) のみがドロップされるか、エラーがスローされます。

Auto Loader でのスキーマの推論と進化

このトピックと該当するオプションの詳細については、 「 Auto Loader でのスキーマ推論と展開の構成 」を参照してください。 ロードされた XML データのスキーマを自動的に検出するようにAuto Loaderを構成すると、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列の導入に応じてテーブル スキーマを進化させることができます。 これにより、時間の経過に伴うスキーマの変更を手動で追跡して適用する必要がなくなります。

デフォルトでは、Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型 (JSON、CSV、および XML) をエンコードしない形式の場合、 Auto Loader 、XML ファイル内のネストされたフィールドを含むすべての列を文字列として推論します。 Apache Spark DataFrameReaderは、スキーマ推論に異なる動作を使用し、サンプル データに基づいて XML ソース内の列のデータ型を選択します。 Auto Loader でこの動作を有効にするには、オプションcloudFiles.inferColumnTypestrueに設定します。

Auto Loader は、データを処理するときに新しい列の追加を検出します。 Auto Loader新しい列を検出すると、ストリームは UnknownFieldException で停止します。 ストリームがこのエラーをスローする前に、Auto Loader はデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることでスキーマの場所を最新のスキーマで更新します。 既存の列のデータ型は変更されません。 Auto Loader は、オプションcloudFiles.schemaEvolutionModeで設定したスキーマ進化のさまざまなモードをサポートします。

スキーマ ヒントを使用すると、既知で予期しているスキーマ情報を推論されたスキーマに適用できます。 列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、整数ではなく倍精度浮動小数点) を選択したい場合は、列のデータ型に対して任意の数のヒントを提供できます。 SQL スキーマ仕様構文を使用した文字列。 復旧されたデータ列が有効な場合、スキーマの大文字と小文字以外の大文字と小文字で名前が付けられたフィールドが _rescued_data 列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitivefalse に設定します。この場合、 Auto Loader大文字と小文字を区別しない方法でデータを読み取ります。

例:

このセクションの例では、 Apache Spark GitHub リポジトリでダウンロードできる XML ファイルを使用します。

XMLの読み取りと書き込み

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))
val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

データの読み取り時にスキーマを手動で指定できます。

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

XML データソースはデータ型を推論できます。

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

DDL で列の名前と型を指定することもできます。 この場合、スキーマは自動的に推論されません。

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

COPY INTO を使用して XML をロードする

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

行検証による XML の読み取り

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()
val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

入れ子になった XML の解析 (from_xml と schema_of_xml)

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

SQL API を使用した from_xml と schema_of_xml

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Auto Loaderを使用して XML をロードする

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()
val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()