spark-xml
ライブラリを使用したXMLデータの読み取りと書き込み
このドキュメントは廃止されており、更新されない可能性があります。 彼のコンテンツに記載されている製品、サービス、または技術は、Databricks によって公式に承認またはテストされていません。
ネイティブ XML ファイル形式のサポートは、パブリック プレビューとして利用できます。 XML ファイルの読み取りと書き込みを参照してください。
この記事では、XMLファイルを Apache Spark データソースとして読み書きする方法について説明します。
必要条件
-
spark-xml
ライブラリを Maven ライブラリとして作成します。Maven 座標には、次のように指定します。- Databricks Runtime 7.x 以降:
com.databricks:spark-xml_2.12:<release>
最新バージョンの
<release>
については、spark-xml
リリースを参照してください。 - Databricks Runtime 7.x 以降:
例
このセクションの例では、 books XML ファイルを使用しています。
-
ブックのXMLファイルを取得します。
Bash$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
-
ファイルを DBFS にアップロードします。
XML データの読み取りと書き込み
- SQL
- Scala
- R
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
オプション
-
読み取り
-
path
: XML ファイルの場所。 標準の Hadoop グロビング式を受け入れます。 -
rowTag
: 行として扱う行タグ。 たとえば、この XML<books><book><book>...</books>
では、値はbook
になります。 デフォルトはROW
です。 -
samplingRatio
:スキーマを推論するためのサンプリング率(0.0~1)。 デフォルトは 1 です。 スキーマを指定しない限り、使用可能なタイプはStructType
、ArrayType
、StringType
、LongType
、DoubleType
、BooleanType
、TimestampType
、NullType
です。 -
excludeAttribute
: 要素内の属性を除外するかどうか。 デフォルトは false です。 -
nullValue
:null
値として扱う値。 デフォルトは""
です。 -
mode
: 破損したレコードを処理するためのモード。 デフォルトはPERMISSIVE
です。-
PERMISSIVE
:- 破損したレコードが検出されると、 はすべてのフィールドを
null
に設定し、不正な形式の文字列をcolumnNameOfCorruptRecord
で設定された新しいフィールドに配置します。 - 間違ったデータ型のフィールドが検出された場合、 は問題のあるフィールドを
null
に設定します。
- 破損したレコードが検出されると、 はすべてのフィールドを
-
DROPMALFORMED
: 破損したレコードを無視します。 -
FAILFAST
: 破損したレコードを検出すると、例外をスローします。
-
-
inferSchema
:true
場合、結果の各 データフレーム 列に対して適切な型 (ブール型、数値型、日付型など) の推論を試みます。false
の場合、結果のすべてのカラムは文字列型です。デフォルトはtrue
です。 -
columnNameOfCorruptRecord
: 不正な形式の文字列が格納されている新しいフィールドの名前。 デフォルトは_corrupt_record
です。 -
attributePrefix
: 属性と要素を区別するための属性の接頭辞。 これは、フィールド名のプレフィックスです。 デフォルトは_
です。 -
valueTag
: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 デフォルトは_VALUE
です。 -
charset
: デフォルトはUTF-8
ですが、他の有効な文字セット名に設定できます。 -
ignoreSurroundingSpaces
: 値を囲む空白をスキップするかどうか。 デフォルトは false です。 -
rowValidationXSDPath
: 各行の XML の検証に使用される XSD ファイルへのパス。 検証に失敗した行は、上記のように解析エラーのように扱われます。 それ以外の場合、XSD は、提供または推論されるスキーマに影響を与えません。 同じローカル パスがクラスターのエグゼキューターにも表示されない場合は、XSD とそれが依存する他の XSDSpark をSparkContext.addFile を使用して エグゼキューターに追加する必要があります。この場合、ローカル XSD/foo/bar.xsd
を使用するには、addFile("/foo/bar.xsd")
を呼び出し、"bar.xsd"
をrowValidationXSDPath
として渡します。
-
-
書き込み
path
: ファイルを書き込む場所。rowTag
: 行として扱う行タグ。 たとえば、この XML<books><book><book>...</books>
では、値はbook
になります。 デフォルトはROW
です。rootTag
: ルートとして扱うルートタグ。 たとえば、この XML<books><book><book>...</books>
では、値はbooks
になります。 デフォルトはROWS
です。nullValue
: 書き込む値null
値。 デフォルトは文字列"null"
です。"null"
すると、フィールドの属性と要素は書き込まれません。attributePrefix
: 属性と要素を区別するための属性の接頭辞。 これは、フィールド名のプレフィックスです。 デフォルトは_
です。valueTag
: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 デフォルトは_VALUE
です。compression
: ファイルに保存するときに使用する圧縮コーデック。org.apache.hadoop.io.compress.CompressionCodec
を実装するクラスの完全修飾名、または大文字と小文字を区別しない短い名前 (bzip2
、gzip
、lz4
、snappy
) のいずれかである必要があります。デフォルトは圧縮なしです。
短縮された名前の使用をサポートします。com.databricks.spark.xml
の代わりに xml
を使用できます。
XSD のサポート
XSD スキーマに対して個々の行を検証するには、 rowValidationXSDPath
を使用します。
ユーティリティ com.databricks.spark.xml.util.XSDToSchema
を使用して Spark データフレーム を抽出します
スキーマを 一部の XSD ファイルから取得します。 シンプルタイプ、コンプレックスタイプ、シーケンスタイプのみをサポートし、基本的なXSD機能のみをサポートします。
そして実験的です。
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
ネストされたXMLの解析
主に XML ファイルを データフレーム に変換するために使用されますが、 from_xml
メソッドを使用して、既存の データフレーム の文字列値列の XML を解析し、解析結果を含む新しい列として次の構造体として追加することもできます。
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
-
mode
:PERMISSIVE
に設定すると、デフォルト、解析モードはデフォルトDROPMALFORMED
になります。columnNameOfCorruptRecord
に一致する列をfrom_xml
のスキーマに含めると、PERMISSIVE
モードでは、結果の構造体のその列に不正なレコードが出力されます。DROPMALFORMED
に設定すると、正しく解析されない XML 値は、カラムにnull
値になります。行は削除されません。
-
from_xml
XMLを含む文字列の配列を解析された構造体の配列に変換します。 代わりにschema_of_xml_array
を使用してください。 -
from_xml_string
は、列の代わりに文字列を直接操作する UDF で使用するための代替手段です。
変換ルール
データフレームs と XML では構造的な違いがあるため、XML データから データフレーム データへの変換ルールと データフレーム データから XML データへの変換ルールがいくつかあります。属性の処理を無効にするには、オプション excludeAttribute
を使用します。
XML を データフレーム に変換する
-
属性: 属性は、
attributePrefix
オプションで指定されたプレフィックスを持つフィールドとして変換されます。attributePrefix
が_
の場合、ドキュメントはXML<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>スキーマを生成します。
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true) -
要素に属性はあるが、子要素がない場合、属性値は
valueTag
オプションで指定された別のフィールドに格納されます。valueTag
が_VALUE
の場合、ドキュメントはXML<one>
<two myTwoAttrib="BBBBB">two</two>
<three>three</three>
</one>スキーマを生成します。
root
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
データフレーム を XML に変換する
要素を ArrayType
としてフィールド ArrayType
を持つ データフレーム から XML ファイルを書き込むと、要素に対して追加のネストされたフィールドが作成されます。これは、XML データの読み取りと書き込みではなく、他のソースから読み取られた データフレーム の書き込みで発生します。 したがって、XML ファイルの読み取りと書き込みのラウンドトリップは同じ構造を持ちますが、他のソースから読み取られた データフレーム の書き込みは異なる構造を持つ可能性があります。
スキーマを持つ データフレーム:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
およびデータ:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
は XML ファイルを生成します。
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>