XML ファイルの読み取りと書き込み
プレビュー
この機能は パブリック プレビュー段階です。
この記事では、XML ファイルの読み取りと書き込みの方法について説明します。
XML (Extensible Markup Language) は、テキスト形式でデータを書式設定、保存、および共有するためのマークアップ言語です。 これは、ドキュメントから任意のデータ構造まで、データをシリアル化するための一連のルールを定義します。
ネイティブ XML ファイル形式のサポートにより、バッチ処理またはストリーミングのための XML データの取り込み、クエリ、および解析が可能になります。 スキーマとデータ型を自動的に推論して進化させることができ、 from_xml
などの SQL 式をサポートし、XML ドキュメントを生成できます。 外部ジャーを必要とせず、Auto Loader、 read_files
、 COPY INTO
とシームレスに連携します。 オプションで、各行レベルの XML レコードを XML スキーマ定義 (XSD) と照合して検証できます。
必要条件
Databricks Runtime 14.3 以降
XML レコードの解析
XML 仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされません。 DataFrame
Row
にマップする XML 要素を示すには、rowTag
オプションを指定する必要があります。rowTag
要素が最上位のstruct
になります。rowTag
の子要素は、最上位のstruct
のフィールドになります。
このレコードのスキーマを指定することも、自動的に推測させることもできます。 パーサーは rowTag
要素のみを調べるため、DTDと外部エンティティは除外されます。
次の例は、さまざまな rowTag
オプションを使用した XML ファイルのスキーマ推論と解析を示しています。
- Python
- Scala
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
rowTag
オプションを指定して XML ファイルを "books" として読み込みます。
- Python
- Scala
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
アウトプット:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
rowTag
を "book" として XML ファイルを読み取ります。
- Python
- Scala
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
アウトプット:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
データソースオプション
XML のデータソース・オプションは、以下の方法で指定できます。
-
次の
.option/.options
メソッドがあります。- データフレームリーダー
- データフレームライター
- データストリームリーダー
- データストリームライター
-
次の組み込み関数:
-
CREATE TABLE USING の
OPTIONS
句DATA_SOURCE
オプションの一覧については、「 Auto Loader オプション」を参照してください。
XSD のサポート
オプションで、各行レベルの XML レコードを XML スキーマ定義 (XSD) によって検証できます。 XSD ファイルは rowValidationXSDPath
オプションで指定されます。 それ以外の場合、XSD は、提供または推論されるスキーマに影響を与えません。 検証に失敗したレコードは「破損」としてマークされ、オプションのセクションで説明されている破損レコード処理モードオプションに基づいて処理されます。
XSDToSchema
を使用して、XSD ファイルから Spark データフレーム スキーマを抽出できます。単純型、複合型、およびシーケンス型のみをサポートし、基本的な XSD 機能のみをサポートします。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
次の表は、XSD データ型から Spark データ型への変換を示しています。
XSD データ型 | Spark のデータ型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ネストされたXMLの解析
既存の データフレーム の文字列値列の XML データは、スキーマと解析結果を新しい struct
列として返す schema_of_xml
と from_xml
で解析できます。引数として schema_of_xml
および from_xml
に渡される XML データは、1 つの整形式の XML レコードである必要があります。
schema_of_xml
構文
schema_of_xml(xmlStr [, options] )
引数
xmlStr
: 単一の整形式の XML レコードを指定する文字列式。options
: ディレクティブを指定する省略可能なMAP<STRING,STRING>
リテラル。
戻り値
文字列の n フィールドを持つ構造体の定義を保持する文字列。列名は XML 要素名と属性名から派生します。 フィールド値は、派生した書式設定された SQL 型を保持します。
from_xml
構文
from_xml(xmlStr, schema [, options])
引数
xmlStr
: 単一の整形式の XML レコードを指定する文字列式。schema
: 文字列式またはschema_of_xml
関数の呼び出し。options
: ディレクティブを指定する省略可能なMAP<STRING,STRING>
リテラル。
戻り値
スキーマ定義に一致するフィールド名と型を持つ構造体。 スキーマは、 CREATE TABLE
などで使用されるように、コンマ区切りの列名とデータ型のペアとして定義する必要があります。 データソースオプションに示されているほとんどのオプションは、
以下の例外:
rowTag
: XML レコードは 1 つしかないため、rowTag
オプションは適用されません。mode
(デフォルト:PERMISSIVE
): 解析中に破損したレコードを処理するモードを許可します。PERMISSIVE
: 破損したレコードに遭遇した場合、不正な形式の文字列をcolumnNameOfCorruptRecord
で設定されたフィールドに入れ、不正な形式のフィールドをnull
に設定します。 破損したレコードを保持するには、ユーザー定義スキーマでcolumnNameOfCorruptRecord
という名前の文字列型フィールドを設定します。 スキーマにフィールドがない場合、解析中に破損したレコードが削除されます。 スキーマを推論すると、出力スキーマにcolumnNameOfCorruptRecord
フィールドが暗黙的に追加されます。FAILFAST
: 破損したレコードに遭遇すると、例外をスローします。
構造変換
データフレーム と XML の構造の違いにより、XML データから DataFrame
データ、および DataFrame
から XML データへの変換ルールがいくつかあります。 属性の処理は、オプション excludeAttribute
で無効にできることに注意してください。
XML から データフレーム への変換
属性 : 属性は、見出し接頭辞 attributePrefix
が付いたフィールドとして変換されます。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
以下のスキーマを生成します。
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
属性または子要素を含む要素内の文字データ: これらは valueTag
フィールドに解析されます。 文字データが複数ある場合は、valueTag
フィールドは array
タイプに変換されます。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
以下のスキーマを生成します。
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
データフレーム から XML への変換
配列内の配列としての要素 : フィールドを持つ DataFrame
から XML ファイルを書き込む
ArrayType
その要素を ArrayType
とすると、
要素。 これは、XML データの読み取りと書き込みではなく、 DataFrame
の書き込みで発生します
他のソースから読みます。 したがって、XMLファイルの読み取りと書き込みのラウンドトリップは同じです
構造ですが、他のソースから読み取られた DataFrame
を書くことは、異なることが可能です
構造。
以下のスキーマを持つ データフレーム:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
そして以下のデータで:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
は、以下の XML ファイルを生成します。
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
DataFrame
内の名前のない配列の要素名は、オプション arrayElementName
で指定します (デフォルト: item
)。
救出されたデータ列
レスキューされたデータ列により、ETL中にデータが失われたり見逃されたりすることはありません。 救出されたデータ列を有効にして、レコード内の 1 つ以上のフィールドに次のいずれかの問題があるために解析されなかったデータをキャプチャできます。
- 指定されたスキーマに存在しない
- 指定されたスキーマのデータ型と一致しません
- 指定されたスキーマのフィールド名と大文字と小文字が一致しません
救出されたデータ列は、救出された列とレコードのソースファイルパスを含む JSON ドキュメントとして返されます。 レスキューされたデータ列からソース・ファイル・パスを削除するには、次の SQL 構成を設定します。
- Python
- Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
レスキューされたデータ列を有効にするには、データの読み取り時にオプション rescuedDataColumn
を列名に設定します (spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)``_rescued_data
など)。
XMLパーサーは、レコードを解析する際に PERMISSIVE
、DROPMALFORMED
、FAILFAST
の3つのモードをサポートしています。 rescuedDataColumn
と併用すると、データ型が一致しなくても DROPMALFORMED
モードでレコードが削除されたり、FAILFAST
モードでエラーが発生したりすることはありません。 壊れたレコード(不完全または不正な形式のXML)だけがドロップされるか、エラーが発生します。
Auto Loaderにおけるスキーマ推論と進化
このトピックと適用可能なオプションの詳細については、「Auto Loaderでのスキーマ推論と進化の設定」を参照してください。ロードされた XML データのスキーマを自動的に検出するように Auto Loader を設定できるため、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されるたびにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を経時的に手動で追跡して適用する必要がなくなります。
デフォルトでは、 Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型をエンコードしない形式 (JSON、 CSV、 XML の場合)、 Auto Loader は XML ファイル内のネストされたフィールドを含むすべての列を文字列として推論します。 Apache Spark DataFrameReader
では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて XML ソースの列のデータ型が選択されます。 この動作を Auto Loaderで有効にするには、オプション [cloudFiles.inferColumnTypes
] を [true
] に設定します。
Auto Loader は、データの処理中に新しい列の追加を検出します。 Auto Loader が新しい列を検出すると、ストリームはUnknownFieldException
で停止します。ストリームでこのエラーがスローされる前に、 Auto Loader はデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることで、スキーマの場所を最新のスキーマで更新します。 既存の列のデータ型は変更されません。 Auto Loader は、オプション cloudFiles.schemaEvolutionMode
で設定する スキーマ進化のさまざまなモードをサポートしています。
スキーマ ヒントを使用すると、推論されたスキーマに対して、既知のスキーマ情報と期待するスキーマ情報を適用できます。列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、整数ではなく double 形式) を選択する場合は、SQL スキーマ指定構文を使用して、列データ型のヒントを任意の数として文字列として提供できます。 レスキューされたデータ列が有効になっている場合、スキーマのケース以外の名前のフィールドが _rescued_data
列にロードされます。 この動作を変更するには、オプション readerCaseSensitive
を false
に設定した場合、 Auto Loader は大文字と小文字を区別しない方法でデータを読み取ります。
例
このセクションの例では、 Apache Spark GitHub リポジトリからダウンロードできる XML ファイルを使用します。
XML の読み取りと書き込み
- Python
- Scala
- R
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
データの読み取り時にスキーマを手動で指定できます。
- Python
- Scala
- R
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML データソースは、データ型を推測できます。
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL で列の名前とタイプを指定することもできます。 この場合、スキーマは自動的に推論されません。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTOを使用したXMLの読み込み
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
行検証による XML の読み取り
- Python
- Scala
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
ネストされたXMLの解析(from_xmlおよびschema_of_xml)
- Python
- Scala
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API を使用したfrom_xmlとschema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Auto Loader で XML を読み込む
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)