XML ファイルの読み取りと書き込み
プレビュー
この機能は パブリック プレビュー段階です。
Extensible Markup Language(XML)は、テキスト形式でデータをフォーマット、保存、共有するためのマークアップ言語です。ドキュメントから任意のデータ構造まで、データをシリアル化するための一連のルールを定義します。
Databricks は、自動スキーマ推論と進化、行タグ構成、XSD 検証、および from_xml のような SQL 式など、Apache Spark での XML の読み書きをサポートしています。ネイティブXMLサポートは、外部jarファイルを必要とせずに、Auto Loader、read_files、およびCOPY INTOと連携します。
前提条件
XMLファイル形式のサポートにはDatabricks Runtime 14.3以降が必要です。
オプション
XML データ ソースを構成するには、 DataFrameReaderとDataFrameWriterの.option()メソッドと.options()メソッドを使用します。 サポートされているオプションの完全なリストについては、 DataFrameReader XML オプションとDataFrameWriter XML オプションを参照してください。
XML レコードの解析
XML 仕様では、整形式の構造が義務付けられています。ただし、この仕様はすぐに表形式にマップされません。DataFrame Rowにマップする XML 要素を示すには、rowTag オプションを指定する必要があります。rowTag 要素が最上位のstructになります。rowTag の子要素は、最上位のstructのフィールドになります。
このレコードのスキーマを指定することも、自動的に推測させることもできます。 パーサーは rowTag 要素のみを調べるため、DTDと外部エンティティは除外されます。
次の例は、さまざまな rowTag オプションを使用した XML ファイルのスキーマ推論と解析を示しています。
- Python
- Scala
xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)
val xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)
XML ファイルをrowTagオプションで"reviews"として読み取ります:
- Python
- Scala
- SQL
df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'reviews'
)
アウトプット:
root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)
+----------------------------------------------------------------------------------------+
|review |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+
rowTag を "review" として XML ファイルを読み込みます:
- Python
- Scala
- SQL
df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:
val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review'
)
アウトプット:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)
+----+------+--------------------------------+------+
|_id |author|comment |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5 |
|r002|Bob |Great location, very comfortable|4 |
+----+------+--------------------------------+------+
XSD を使用して XML レコードを検証
各行のXMLレコードは、オプションでXMLスキーマ定義(XSD)によって検証できます。XSD ファイルは rowValidationXSDPath オプションで指定されます。それ以外の場合、XSD は、提供または推論されるスキーマに影響を与えません。検証に失敗したレコードは「破損」としてマークされ、オプションセクションで説明されている破損レコード処理モードオプションに基づいて処理されます。
XSDToSchema を使用して、XSD ファイルから Spark データフレーム スキーマを抽出できます。単純型、複合型、およびシーケンス型のみをサポートし、基本的な XSD 機能のみをサポートします。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="review">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="rating" type="xs:integer" />
<xs:element name="comment" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
次の表は、XSD データ型から Spark データ型への変換を示しています。
XSD データ型 | Spark のデータ型 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ネストされたXMLの解析
既存の データフレーム の文字列値列の XML データは、スキーマと解析結果を新しい struct 列として返す schema_of_xml と from_xml で解析できます。引数として schema_of_xml および from_xml に渡される XML データは、1 つの整形式の XML レコードである必要があります。
schema_of_xml
schema_of_xml を使用して、XML文字列からSparkスキーマを推論します。XML 列を解析するために結果を from_xml に渡します。
構文: schema_of_xml(xmlStr [, options])
引数 | 必須 | 説明 |
|---|---|---|
| はい | 単一の整形式の XML レコードを指定する文字列式。 |
| No |
|
XML要素名および属性名から列名が派生するn個の文字列フィールドを持つ構造体の定義を含む文字列を返します。フィールド値は、派生したフォーマットされたSQL型を保持しています。
from_xml
from_xml を使用して、XML レコードを含む文字列列を構造体として解析します。直接スキーマを提供するか、schema_of_xmlの出力を使用してください。
構文: from_xml(xmlStr, schema [, options])
引数 | 必須 | 説明 |
|---|---|---|
| はい | 単一の整形式の XML レコードを指定する文字列式。 |
| はい | 文字列式または |
| No |
|
スキーマ定義に一致するフィールド名と型を持つ構造体を返します。スキーマは、たとえばCREATE TABLEで使われているように、カンマ区切りの列名とデータ型のペアとして定義する必要があります。Options セクションに表示されているほとんどのオプションは、次の例外を除き適用されます。
rowTag: XML レコードは 1 つしかないため、rowTagオプションは適用されません。mode(デフォルト:PERMISSIVE): 解析中に破損したレコードを処理するモードを許可します。PERMISSIVE: 破損したレコードに遭遇した場合、不正な形式の文字列をcolumnNameOfCorruptRecordで設定されたフィールドに入れ、不正な形式のフィールドをnullに設定します。 破損したレコードを保持するには、ユーザー定義スキーマでcolumnNameOfCorruptRecordという名前の文字列型フィールドを設定します。 スキーマにフィールドがない場合、解析中に破損したレコードが削除されます。 スキーマを推論すると、出力スキーマにcolumnNameOfCorruptRecordフィールドが暗黙的に追加されます。FAILFAST: 破損したレコードに遭遇すると、例外をスローします。
例
XML文字列列を解析するには、schema_of_xmlを使用してスキーマを推論し、その後、from_xmlに渡します。
- Python
- Scala
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
"""
df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}
val xmlData = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>""".stripMargin
val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQLでインラインXMLを解析する:
SELECT from_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>',
schema_of_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>')
);
XML と DataFrame 構造間の変換
データフレーム と XML の構造の違いにより、XML データから DataFrame データ、および DataFrame から XML データへの変換ルールがいくつかあります。 属性の処理は、オプション excludeAttributeで無効にできることに注意してください。
XML から データフレーム への変換
XML を読み込む際、Databricks は XML 要素と属性を DataFrame フィールドに次のルールに従ってマップします。
属性はヘッディングプレフィックスattributePrefixでフィールドとして変換されます。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
これは次のスキーマを生成します。
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
属性または子要素を含む要素内の文字データは、valueTag フィールドに解析されます。文字データが複数回出現する場合、valueTag フィールドは array 型に変換されます。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
これは次のスキーマを生成します。
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
データフレーム から XML への変換
DataFrame を XML に書き込む際、DataFrame と XML のデータモデル間の違いにより、特定のネストされた構造には特別な処理が必要です。
データフレームが要素のタイプもArrayTypeであるArrayTypeフィールドを含む場合、それをXMLに書き込むと、XMLファイルをラウンドトリップする際には存在しない追加のネストレベルが生成されます。これは、XML 以外のソースから取得されたデータフレームにのみ影響します。XML ファイルの読み取りと書き込みでは、元の構造が維持されます。
例えば、以下のスキーマを持つDataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
および次のデータ:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
次の XML 出力を生成します:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
DataFrame内の名前のない配列の要素名は、オプション arrayElementName で指定します (デフォルト: item)。
レスキューされたデータ列を有効にする
レスキューされたデータ列により、ETL中にデータが失われることはありません。レコード内の1つ以上のフィールドに次のいずれかの問題があるため、解析されなかったデータをキャプチャします。
- 指定されたスキーマに存在しない
- 指定されたスキーマのデータ型と一致しない
- 指定されたスキーマのフィールド名と大文字小文字の組み合わせが一致しない
レスキューされたデータ列は、レスキューされた列とレコードのソースファイルパスを含むJSONドキュメントとして返されます。
レスキューされたデータ列を有効にするには、読み取るときに rescuedDataColumn オプションを列名に設定します。
- Python
- Scala
- SQL
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
format => 'xml',
rowTag => 'review',
rescuedDataColumn => '_rescued_data'
)
レスキューされたデータ列からソースファイルパスを削除するには、設定します:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
XMLパーサーは、レコードを解析する際に PERMISSIVE、DROPMALFORMED、FAILFAST の3つのモードをサポートしています。 rescuedDataColumn と併用すると、データ型が一致しなくても DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーが発生したりすることはありません。 壊れたレコード(不完全または不正な形式のXML)だけがドロップされるか、エラーが発生します。
Auto Loader でスキーマを推論して進化させる
このトピックと適用可能なオプションの詳細については、「Auto Loaderでのスキーマ推論と進化の設定」を参照してください。ロードされた XML データのスキーマを自動的に検出するように Auto Loader を設定できるため、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されるたびにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を経時的に手動で追跡して適用する必要がなくなります。
デフォルトでは、 Auto Loader スキーマ推論は、型の不一致によるスキーマ進化の問題を回避しようとします。 データ型をエンコードしない形式 (JSON、 CSV、 XML の場合)、 Auto Loader は XML ファイル内のネストされたフィールドを含むすべての列を文字列として推論します。 Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて XML ソースの列のデータ型が選択されます。この動作を Auto Loaderで有効にするには、オプション cloudFiles.inferColumnTypes を trueに設定します。
Auto Loader は、データの処理中に新しい列の追加を検出します。 Auto Loader が新しい列を検出すると、ストリームはUnknownFieldExceptionで停止します。ストリームでこのエラーがスローされる前に、 Auto Loader はデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾にマージすることで、スキーマの場所を最新のスキーマで更新します。 既存の列のデータ型は変更されません。 Auto Loader は、オプション cloudFiles.schemaEvolutionMode で設定する スキーマ進化のさまざまなモードをサポートしています。
スキーマ ヒントを使用すると、推論されたスキーマに対して、既知のスキーマ情報と期待するスキーマ情報を適用できます。列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、整数ではなく double 形式) を選択する場合は、SQL スキーマ指定構文を使用して、列データ型のヒントを任意の数として文字列として提供できます。 レスキューされたデータ列が有効になっている場合、スキーマのケース以外の名前のフィールドが _rescued_data 列にロードされます。 この動作を変更するには、オプション readerCaseSensitive を false に設定した場合、 Auto Loader は大文字と小文字を区別しない方法でデータを読み取ります。
使い方
以下の例では、Wanderbricksデータセットを使用して、Spark DataFrame APIとSQLを使用してXMLファイルの読み書きを行う方法を説明します。
XML の読み取りと書き込み
WanderbricksのレビューをXMLに書き込み、再度読み込むために、DataFrame APIを使用します。
- Python
- Scala
- R
# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
.format("xml") \
.option("rootTag", "reviews") \
.option("rowTag", "review") \
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
# Read the XML file back
df_read = spark.read \
.format("xml") \
.option("rowTag", "review") \
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()
// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
.format("xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
// Read the XML file back
val dfRead = spark.read
.format("xml")
.option("rowTag", "review")
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
データの読み取り時にスキーマを手動で指定できます。
- Python
- Scala
- R
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("rating", "integer"),
structField("comment", "string"))
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
SQL を使用した XML の読み取りと書き込み
SQL DDLを使用してXMLファイルからテーブルを作成します。Databricks は列のデータ型を自動的に推測します。
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;
DDL で列の名前とタイプを指定することもできます。 この場合、スキーマは自動的に推論されません。
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
COPY INTOを使用したXMLの読み込み
COPY INTO を使用して、XML ファイルをクラウドストレージから Delta テーブルにロードします。
DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;
COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');
行検証による XML の読み取り
rowValidationXSDPath オプションを使用して、読み取り時に各行を XSD スキーマに対して検証します。
- Python
- Scala
- SQL
df = (spark.read
.format("xml")
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()
val df = spark.read
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review',
rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)
Auto Loader で XML を読み込む
Auto Loader を使用して、クラウドストレージから XML ファイルを継続的に、自動スキーマ推論と進化機能に対応した Delta テーブルに取り込みます。
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("reviews")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow())
.toTable("reviews")
追加のリソース
spark-xmlライブラリを使用した XML データの読み取りと書き込み: 「オープンソース Spark XML ライブラリ」を以前使用されていたユーザーは、従来の統合ガイドを参照してください。- JSONファイルを読み書きする: データが半構造化されていても、XMLではない場合、JSONはよりシンプルな形式で、同様のスキーマ推論とネストされたデータのサポートを提供します。