メインコンテンツまでスキップ

spark-xmlライブラリを使用したXMLデータの読み取りと書き込み

important

このドキュメントは廃止されており、更新されない可能性があります。 彼のコンテンツに記載されている製品、サービス、または技術は、Databricks によって公式に承認またはテストされていません。

ネイティブ XML ファイル形式のサポートは、パブリック プレビューとして利用できます。 XML ファイルの読み取りと書き込みを参照してください。

この記事では、XMLファイルを Apache Spark データソースとして読み書きする方法について説明します。

必要条件

  1. spark-xml ライブラリを Maven ライブラリとして作成します。Maven 座標には、次のように指定します。

    • com.databricks:spark-xml_2.12:<release>

    最新バージョンの <release>については、spark-xml リリースを参照してください。

  2. ライブラリをクラスターにインストールし ます。

このセクションの例では、 books XML ファイルを使用しています。

  1. ブックのXMLファイルを取得します。

    Bash
    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
  2. ファイルを DBFS にアップロードします

XML データの読み取りと書き込み

SQL
/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

オプション

  • 読み取り

    • path: XML ファイルの場所。 標準の Hadoop グロビング式を受け入れます。

    • rowTag: 行として扱う行タグ。 たとえば、この XML <books><book><book>...</books>では、値は bookになります。 デフォルトは ROWです。

    • samplingRatio:スキーマを推論するためのサンプリング率(0.0~1)。 デフォルトは 1 です。 スキーマを指定しない限り、使用可能なタイプは StructTypeArrayTypeStringTypeLongTypeDoubleTypeBooleanTypeTimestampTypeNullTypeです。

    • excludeAttribute: 要素内の属性を除外するかどうか。 デフォルトは false です。

    • nullValue: null 値として扱う値。 デフォルトは ""です。

    • mode: 破損したレコードを処理するためのモード。 デフォルトは PERMISSIVEです。

      • PERMISSIVE:

        • 破損したレコードが検出されると、 はすべてのフィールドを null に設定し、不正な形式の文字列を columnNameOfCorruptRecordで設定された新しいフィールドに配置します。
        • 間違ったデータ型のフィールドが検出された場合、 は問題のあるフィールドを nullに設定します。
      • DROPMALFORMED: 破損したレコードを無視します。

      • FAILFAST: 破損したレコードを検出すると、例外をスローします。

    • inferSchema: true場合、結果の各 データフレーム 列に対して適切な型 (ブール型、数値型、日付型など) の推論を試みます。 falseの場合、結果のすべてのカラムは文字列型です。デフォルトは trueです。

    • columnNameOfCorruptRecord: 不正な形式の文字列が格納されている新しいフィールドの名前。 デフォルトは _corrupt_recordです。

    • attributePrefix: 属性と要素を区別するための属性の接頭辞。 これは、フィールド名のプレフィックスです。 デフォルトは _です。

    • valueTag: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 デフォルトは _VALUEです。

    • charset: デフォルトは UTF-8 ですが、他の有効な文字セット名に設定できます。

    • ignoreSurroundingSpaces: 値を囲む空白をスキップするかどうか。 デフォルトは false です。

    • rowValidationXSDPath: 各行の XML の検証に使用される XSD ファイルへのパス。 検証に失敗した行は、上記のように解析エラーのように扱われます。 それ以外の場合、XSD は、提供または推論されるスキーマに影響を与えません。 同じローカル パスがクラスターのエグゼキューターにも表示されない場合は、XSD とそれが依存する他の XSDSpark をSparkContext.addFile を使用して エグゼキューターに追加する必要があります。この場合、ローカル XSD /foo/bar.xsdを使用するには、 addFile("/foo/bar.xsd") を呼び出し、 "bar.xsd"rowValidationXSDPathとして渡します。

  • 書き込み

    • path: ファイルを書き込む場所。
    • rowTag: 行として扱う行タグ。 たとえば、この XML <books><book><book>...</books>では、値は bookになります。 デフォルトは ROWです。
    • rootTag: ルートとして扱うルートタグ。 たとえば、この XML <books><book><book>...</books>では、値は booksになります。 デフォルトは ROWSです。
    • nullValue: 書き込む値 null 値。 デフォルトは文字列 "null"です。 "null"すると、フィールドの属性と要素は書き込まれません。
    • attributePrefix: 属性と要素を区別するための属性の接頭辞。 これは、フィールド名のプレフィックスです。 デフォルトは _です。
    • valueTag: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 デフォルトは _VALUEです。
    • compression: ファイルに保存するときに使用する圧縮コーデック。 org.apache.hadoop.io.compress.CompressionCodec を実装するクラスの完全修飾名、または大文字と小文字を区別しない短い名前 (bzip2gziplz4snappy) のいずれかである必要があります。デフォルトは圧縮なしです。

短縮された名前の使用をサポートします。com.databricks.spark.xmlの代わりに xml を使用できます。

XSD のサポート

XSD スキーマに対して個々の行を検証するには、 rowValidationXSDPathを使用します。

ユーティリティ com.databricks.spark.xml.util.XSDToSchema を使用して Spark データフレーム を抽出します スキーマを 一部の XSD ファイルから取得します。 シンプルタイプ、コンプレックスタイプ、シーケンスタイプのみをサポートし、基本的なXSD機能のみをサポートします。 そして実験的です。

Scala
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

ネストされたXMLの解析

主に XML ファイルを データフレーム に変換するために使用されますが、 from_xml メソッドを使用して、既存の データフレーム の文字列値列の XML を解析し、解析結果を含む新しい列として次の構造体として追加することもできます。

Scala
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
注記
  • mode:

    • PERMISSIVEに設定すると、デフォルト、解析モードはデフォルトDROPMALFORMEDになります。columnNameOfCorruptRecordに一致する列を from_xml のスキーマに含めると、PERMISSIVE モードでは、結果の構造体のその列に不正なレコードが出力されます。
    • DROPMALFORMEDに設定すると、正しく解析されない XML 値は、カラムにnull値になります。行は削除されません。
  • from_xml XMLを含む文字列の配列を解析された構造体の配列に変換します。 代わりに schema_of_xml_array を使用してください。

  • from_xml_string は、列の代わりに文字列を直接操作する UDF で使用するための代替手段です。

変換ルール

データフレームs と XML では構造的な違いがあるため、XML データから データフレーム データへの変換ルールと データフレーム データから XML データへの変換ルールがいくつかあります。属性の処理を無効にするには、オプション excludeAttributeを使用します。

XML を データフレーム に変換する

  • 属性: 属性は、 attributePrefix オプションで指定されたプレフィックスを持つフィールドとして変換されます。 attributePrefix_の場合、ドキュメントは

    XML
    <one myOneAttrib="AAAA">
    <two>two</two>
    <three>three</three>
    </one>

    スキーマを生成します。

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
  • 要素に属性はあるが、子要素がない場合、属性値は valueTag オプションで指定された別のフィールドに格納されます。 valueTag_VALUEの場合、ドキュメントは

    XML
    <one>
    <two myTwoAttrib="BBBBB">two</two>
    <three>three</three>
    </one>

    スキーマを生成します。

    root
    |-- two: struct (nullable = true)
    | |-- _VALUE: string (nullable = true)
    | |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)

データフレーム を XML に変換する

要素を ArrayType としてフィールド ArrayTypeを持つ データフレーム から XML ファイルを書き込むと、要素に対して追加のネストされたフィールドが作成されます。これは、XML データの読み取りと書き込みではなく、他のソースから読み取られた データフレーム の書き込みで発生します。 したがって、XML ファイルの読み取りと書き込みのラウンドトリップは同じ構造を持ちますが、他のソースから読み取られた データフレーム の書き込みは異なる構造を持つ可能性があります。

スキーマを持つ データフレーム:

 |-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

およびデータ:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

は XML ファイルを生成します。

XML
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>