How to Handle Blob Data Contained in an XML File

If an EC2 log records events in XML format, then every XML event will record EC2-related information as a base64 string. In order to run analytics on this data using Apache Spark, you need to use the spark_xml library and BASE64DECODER API to transform this data for analysis.

Problem

You need to analyze base64-encoded strings from an XML-formatted log file using Spark. For example, the following file input.xml shows this type of format:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log [<!ENTITY % log SYSTEM "aws_instance">%log;]>
<log systemID="MF2018" timeZone="UTC" timeStamp="Mon Mar 25 16:00:01 2018">
<message source="ec2.log" time="Mon Mar 25 16:00:01 2018" type="sysMSG"><text/>
<detail>
  <blob>aW5zdGFuY2VJZCxzdGFydFRpbWUsZGVsZXRlVGltZSxob3Vycw0KaS0wMjdmYTdjY2RhMjEwYjRmNCwyLzE3LzE3VDIwOjIxLDIvMTcvMTdUMjE6MTEsNQ0KaS0wN2NkNzEwMGUzZjU0YmY2YSwyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsNA0KaS0wYTJjNGFkYmYwZGMyNTUxYywyLzE3LzE3VDIwOjE5LDIvMTcvMTdUMjE6MTEsMg0KaS0wYjQwYjE2MjM2Mzg4OTczZiwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsNg0KaS0wY2ZkODgwNzIyZTE1ZjE5ZSwyLzE3LzE3VDIwOjE4LDIvMTcvMTdUMjE6MTEsMg0KaS0wY2YwYzczZWZlZWExNGY3NCwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsMQ0KaS0wNTA1ZTk1YmZlYmVjZDZlNiwyLzE3LzE3VDE2OjIxLDIvMTcvMTdUMTc6MTEsOA==
</blob>
</detail>
</message>
</log>

Solution

To parse the XML file:

  1. Load the XML data.
  2. Use the spark_xml library and create a raw DataFrame.
  3. Apply a base64 decoder on the blob column using the BASE64Decoder API.
  4. Save the decoded data in a text file (optional).
  5. Load the text file using the Spark DataFrame and parse it.
  6. Create the DataFrame as a Spark SQL table.

The following Spark code processes the file:

val xmlfile = "/mnt/vgiri/input.xml"
val readxml = spark.read.format("com.databricks.spark.xml").option("rowTag","message").load(xmlfile)

val decoded = readxml.selectExpr("_source as source","_time as time","_type as type","detail.blob")

decoded.show() //Displays the raw blob data


//Apply base64 decoder on every piece of blob data as shown below
val decodethisxmlblob = decoded.rdd
    .map(str => str(3).toString)
    .map(str1 => new String(new sun.misc.BASE64Decoder()
    .decodeBuffer(str1)))

//Store it in a text file temporarily
decodethisxmlblob.saveAsTextFile("/mnt/vgiri/ec2blobtotxt")

//Parse the text file as required using Spark DataFrame.

val readAsDF = spark.sparkContext.textFile("/mnt/vgiri/ec2blobtotxt")
val header = readAsDF.first()
val finalTextFile = readAsDF.filter(row => row != header)


val finalDF = finalTextFile.toDF()
    .selectExpr(
    ("split(value, ',')[0] as instanceId"),
    ("split(value, ',')[1] as startTime"),
    ("split(value, ',')[2] as deleteTime"),
    ("split(value, ',')[3] as hours")
    )

finalDF.show()

The Spark code generates the following output:

18/03/24 22:54:31 INFO DAGScheduler: ResultStage 4 (show at SparkXMLBlob.scala:42) finished in 0.016 s
18/03/24 22:54:31 INFO DAGScheduler: Job 4 finished: show at SparkXMLBlob.scala:42, took 0.019120 s
18/03/24 22:54:31 INFO SparkContext: Invoking stop() from shutdown hook
+-------------------+-------------+-------------+-----+
| instanceId        | startTime   | deleteTime  |hours|
+-------------------+-------------+-------------+-----+
|i-027fa7ccda210b4f4|2/17/17T20:21|2/17/17T21:11|    5|
|i-07cd7100e3f54bf6a|2/17/17T20:19|2/17/17T21:11|    4|
|i-0a2c4adbf0dc2551c|2/17/17T20:19|2/17/17T21:11|    2|
|i-0b40b16236388973f|2/17/17T20:18|2/17/17T21:11|    6|
|i-0cfd880722e15f19e|2/17/17T20:18|2/17/17T21:11|    2|
|i-0cf0c73efeea14f74|2/17/17T16:21|2/17/17T17:11|    1|
|i-0505e95bfebecd6e6|2/17/17T16:21|2/17/17T17:11|    8|
+-------------------+-------------+-------------+-----+