complex-nested-structured(Scala)
Loading...

Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types

While this in-depth blog lays out and explains the concepts and motivations for processing and handling complex data types and formats, this notebook example examines how you can apply them, with a few concrete examples, for data types that you might encounter in your use cases. This short notebook tutorial shows ways in which you can explore and employ a number of new helper Spark SQL utility functions and APIs as part of org.apache.spark.sql.functions package. In particular, they come in handy while doing Streaming ETL, in which data are JSON objects with complex and nested structures: Map and Structs embedded as JSON:

  • get_json_object()
  • from_json()
  • to_json()
  • explode()
  • selectExpr()

The takeaway from this short tutorial is myriad ways to slice and dice nested JSON structures with Spark SQL utility functions.

Let's create a simple JSON schema with attributes and values, without any nested structures.

import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.functions._                     // include the Spark helper functions

val jsonSchema = new StructType()
        .add("battery_level", LongType)
        .add("c02_level", LongType)
        .add("cca3",StringType)
        .add("cn", StringType)
        .add("device_id", LongType)
        .add("device_type", StringType)
        .add("signal", LongType)
        .add("ip", StringType)
        .add("temp", LongType)
        .add("timestamp", TimestampType)
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(battery_level,LongType,true), StructField(c02_level,LongType,true), StructField(cca3,StringType,true), StructField(cn,StringType,true), StructField(device_id,LongType,true), StructField(device_type,StringType,true), StructField(signal,LongType,true), StructField(ip,StringType,true), StructField(temp,LongType,true), StructField(timestamp,TimestampType,true))

Using the schema above, create a Dataset, represented as a Scala case type, and generate some JSON data associated with it. In all likelihood, this JSON might as well be a stream of device events read off a Kafka topic. Note that the case class has two fields: integer (as a device id) and a string (as a JSON string representing device events).

Create a Dataset from the above schema
// define a case class
case class DeviceData (id: Int, device: String)
// create some sample data
val eventsDS = Seq (
 (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
 (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""),
(10,"""{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""),
(11,"""{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""),
(12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""),
(13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""),
(14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""),
(15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""),
(16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""),
(17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""),
(18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""),
(19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }""")).toDF("id", "device").as[DeviceData]
defined class DeviceData eventsDS: org.apache.spark.sql.Dataset[DeviceData] = [id: int, device: string]

Our Dataset is collection of Scala case classes, and when displayed as DataFrame, there are two columns (id, string)

display(eventsDS)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
10{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }
11{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }
12{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }
13{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }
14{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }
15{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }
16{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }
17{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }
18{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }
19{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }

Printing the schema atests to two columns of type integer and string, reflecting the Scala case class.

eventsDS.printSchema
root |-- id: integer (nullable = false) |-- device: string (nullable = true)

How to use get_json_object()

This method extracts a JSON object from a JSON string based on JSON path specified, and returns a JSON string as the extracted JSON object. Take the small example of the above dataset, from which we wish to extract portions of data values that make up the JSON object string. Say you want to extract only id, device_type, ip, and CCA3 code. Here's a quick way to do it using get_json_object().

val eventsFromJSONDF = Seq (
 (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
 (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }""")).toDF("id", "json")
eventsFromJSONDF: org.apache.spark.sql.DataFrame = [id: int, json: string]
display(eventsFromJSONDF)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
val jsDF = eventsFromJSONDF.select($"id", get_json_object($"json", "$.device_type").alias("device_type"),
                                          get_json_object($"json", "$.ip").alias("ip"),
                                         get_json_object($"json", "$.cca3").alias("cca3"))
jsDF: org.apache.spark.sql.DataFrame = [id: int, device_type: string ... 2 more fields]
display(jsDF)
0sensor-ipad68.161.225.1USA
1sensor-igauge213.161.254.1NOR
2sensor-ipad88.36.5.1ITA
3sensor-inest66.39.173.154USA
4sensor-ipad203.82.41.9PHL
5sensor-istick204.116.105.67USA
6sensor-ipad220.173.179.1CHN
7sensor-ipad118.23.68.227JPN
8sensor-inest208.109.163.218USA
9sensor-ipad88.213.191.34ITA

How to use from_json()

A variation of get_json_object(), this function uses schema to extract individual columns. Using from_json() helper function within the select() Dataset API call, I can extract or decode data's attributes and values from a JSON string into a DataFrame as columns, dictated by a schema. As well using the schema, I ascribe all associated atrributes and values within this JSON to represent as an entity devices. As such, not only can you use the device.attribute to retrieve its respective value but also all values using the * notation.

In example below:

  • Uses the schema above to extract from the JSON string attributes and values and represent them as individual columns as part of devices
  • select() all its columns
  • Filters on desired attributes using the . notation

Once you have extracted data from a JSON string into its respective DataFrame columns, you can apply DataFrame/Dataset APIs calls to select, filter, and subsequtly display, to your satisfaction.

val devicesDF = eventsDS.select(from_json($"device", jsonSchema) as "devices")
.select($"devices.*")
.filter($"devices.temp" > 10 and $"devices.signal" > 15)
devicesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
display(devicesDF)
0.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.00204060801002318251626201727TOOLTIPUSANORITACHNINDAUTsignaltempcca3USAUSANORNORITAITACHNCHNINDINDAUTAUT
val devicesUSDF = devicesDF.select($"*").where($"cca3" === "USA").orderBy($"signal".desc, $"temp".desc)
devicesUSDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
display(devicesUSDF)
2623201716100%0%0%0%0%22%20%27%31%0%0%0%0%49%51%0%0%0%0%100%signal26262323202017171616sensor-igaugesensor-ipadsensor-inestsensor-istick

How to use to_json()

Now, let's do the reverse: you can convert or encode our filtered devices into JSON string using to_json(). That is, convert a JSON struct into a string. The result can be republished, for instance, to Kafka or saved on disk as parquet files. To learn how to write to Kafka and other sinks, read this blog and our series on Structured Streaming blogs.

val stringJsonDF = eventsDS.select(to_json(struct($"*"))).toDF("devices")
stringJsonDF: org.apache.spark.sql.DataFrame = [devices: string]
display(stringJsonDF)
{"id":0,"device":"{\"device_id\": 0, \"device_type\": \"sensor-ipad\", \"ip\": \"68.161.225.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 25, \"signal\": 23, \"battery_level\": 8, \"c02_level\": 917, \"timestamp\" :1475600496 }"}
{"id":1,"device":"{\"device_id\": 1, \"device_type\": \"sensor-igauge\", \"ip\": \"213.161.254.1\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 30, \"signal\": 18, \"battery_level\": 6, \"c02_level\": 1413, \"timestamp\" :1475600498 }"}
{"id":2,"device":"{\"device_id\": 2, \"device_type\": \"sensor-ipad\", \"ip\": \"88.36.5.1\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 18, \"signal\": 25, \"battery_level\": 5, \"c02_level\": 1372, \"timestamp\" :1475600500 }"}
{"id":3,"device":"{\"device_id\": 3, \"device_type\": \"sensor-inest\", \"ip\": \"66.39.173.154\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 47, \"signal\": 12, \"battery_level\": 1, \"c02_level\": 1447, \"timestamp\" :1475600502 }"}
{"id":4,"device":"{\"device_id\": 4, \"device_type\": \"sensor-ipad\", \"ip\": \"203.82.41.9\", \"cca3\": \"PHL\", \"cn\": \"Philippines\", \"temp\": 29, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 983, \"timestamp\" :1475600504 }"}
{"id":5,"device":"{\"device_id\": 5, \"device_type\": \"sensor-istick\", \"ip\": \"204.116.105.67\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 50, \"signal\": 16, \"battery_level\": 8, \"c02_level\": 1574, \"timestamp\" :1475600506 }"}
{"id":6,"device":"{\"device_id\": 6, \"device_type\": \"sensor-ipad\", \"ip\": \"220.173.179.1\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 21, \"signal\": 18, \"battery_level\": 9, \"c02_level\": 1249, \"timestamp\" :1475600508 }"}
{"id":7,"device":"{\"device_id\": 7, \"device_type\": \"sensor-ipad\", \"ip\": \"118.23.68.227\", \"cca3\": \"JPN\", \"cn\": \"Japan\", \"temp\": 27, \"signal\": 15, \"battery_level\": 0, \"c02_level\": 1531, \"timestamp\" :1475600512 }"}
{"id":8,"device":" {\"device_id\": 8, \"device_type\": \"sensor-inest\", \"ip\": \"208.109.163.218\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 40, \"signal\": 16, \"battery_level\": 9, \"c02_level\": 1208, \"timestamp\" :1475600514 }"}
{"id":9,"device":"{\"device_id\": 9, \"device_type\": \"sensor-ipad\", \"ip\": \"88.213.191.34\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 19, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 1171, \"timestamp\" :1475600516 }"}
{"id":10,"device":"{\"device_id\": 10, \"device_type\": \"sensor-igauge\", \"ip\": \"68.28.91.22\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 32, \"signal\": 26, \"battery_level\": 7, \"c02_level\": 886, \"timestamp\" :1475600518 }"}
{"id":11,"device":"{\"device_id\": 11, \"device_type\": \"sensor-ipad\", \"ip\": \"59.144.114.250\", \"cca3\": \"IND\", \"cn\": \"India\", \"temp\": 46, \"signal\": 25, \"battery_level\": 4, \"c02_level\": 863, \"timestamp\" :1475600520 }"}
{"id":12,"device":"{\"device_id\": 12, \"device_type\": \"sensor-igauge\", \"ip\": \"193.156.90.200\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 18, \"signal\": 26, \"battery_level\": 8, \"c02_level\": 1220, \"timestamp\" :1475600522 }"}
{"id":13,"device":"{\"device_id\": 13, \"device_type\": \"sensor-ipad\", \"ip\": \"67.185.72.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 34, \"signal\": 20, \"battery_level\": 8, \"c02_level\": 1504, \"timestamp\" :1475600524 }"}
{"id":14,"device":"{\"device_id\": 14, \"device_type\": \"sensor-inest\", \"ip\": \"68.85.85.106\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 39, \"signal\": 17, \"battery_level\": 8, \"c02_level\": 831, \"timestamp\" :1475600526 }"}
{"id":15,"device":"{\"device_id\": 15, \"device_type\": \"sensor-ipad\", \"ip\": \"161.188.212.254\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 27, \"signal\": 26, \"battery_level\": 5, \"c02_level\": 1378, \"timestamp\" :1475600528 }"}
{"id":16,"device":"{\"device_id\": 16, \"device_type\": \"sensor-igauge\", \"ip\": \"221.3.128.242\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 10, \"signal\": 24, \"battery_level\": 6, \"c02_level\": 1423, \"timestamp\" :1475600530 }"}
{"id":17,"device":"{\"device_id\": 17, \"device_type\": \"sensor-ipad\", \"ip\": \"64.124.180.215\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 38, \"signal\": 17, \"battery_level\": 9, \"c02_level\": 1304, \"timestamp\" :1475600532 }"}
{"id":18,"device":"{\"device_id\": 18, \"device_type\": \"sensor-igauge\", \"ip\": \"66.153.162.66\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 26, \"signal\": 10, \"battery_level\": 0, \"c02_level\": 902, \"timestamp\" :1475600534 }"}
{"id":19,"device":"{\"device_id\": 19, \"device_type\": \"sensor-ipad\", \"ip\": \"193.200.142.254\", \"cca3\": \"AUT\", \"cn\": \"Austria\", \"temp\": 32, \"signal\": 27, \"battery_level\": 5, \"c02_level\": 1282, \"timestamp\" :1475600536 }"}

Assuming you have a Kafka cluster running at specified port and the respective topic, let's write to Kafka topic.

// stringJsonDF.write
//            .format("kafka")
//            .option("kafka.bootstrap.servers", "your_host_name:9092")
//            .option("topic", "iot-devices")
//            .save()

How to use selectExpr()

Another way to convert or encode a column into a JSON object as string is to use the selectExpr() utility function. For instance, I can convert the "device" column of our DataFrame from above into a JSON String

val stringsDF = eventsDS.selectExpr("CAST(id AS INT)", "CAST(device AS STRING)")
stringsDF: org.apache.spark.sql.DataFrame = [id: int, device: string]
stringsDF.printSchema
root |-- id: integer (nullable = false) |-- device: string (nullable = true)
display(stringsDF)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
10{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }
11{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }
12{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }
13{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }
14{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }
15{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }
16{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }
17{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }
18{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }
19{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }

Another use of selectExpr() is its ability, as the function name suggests, take expressions as arguments and convert them into respective columns. For instance, say I want to express c02 levels and temperature ratios.

display(devicesDF.selectExpr("c02_level", "round(c02_level/temp) as ratio_c02_temperature").orderBy($"ratio_c02_temperature" desc))
137276
122068
124959
137851
141347
150444
128240
91737
130434
157431
120830
88628
83121
86319

The above query could as easily be expressed in Spark SQL as in DataFrame API. The power of selectExpr() lies in dealing with or working with numerical values. Let's try to create a tempoary view and express the same query, except this time we use SQL.

devicesDF.createOrReplaceTempView("devicesDFT")

Notice the output from cmd 42 is not different from cmd 38. Both undergo the same Spark SQL engine's Catalyst and generate equivalent underlying compact code.

%sql select c02_level, 
        round(c02_level/temp) as ratio_c02_temperature 
        from devicesDFT
        order by ratio_c02_temperature desc
137276
122068
124959
137851
141347
150444
128240
91737
130434
157431
120830
88628
83121
86319

To verify that all your string conversions are preserved in the above DataFrame stringJsonDF, let's save to blob storage as Parquet.

stringJsonDF
  .write
  .mode("overwrite")
  .format("parquet")
  .save("/tmp/iot")

Check if all files were written.

%fs ls /tmp/iot
dbfs:/tmp/iot/_SUCCESS_SUCCESS0
dbfs:/tmp/iot/_committed_4820070580060822340_committed_4820070580060822340840
dbfs:/tmp/iot/_committed_7472410303440732077_committed_74724103034407320771666
dbfs:/tmp/iot/_committed_vacuum5255432591358387481_committed_vacuum525543259135838748196
dbfs:/tmp/iot/_started_7472410303440732077_started_74724103034407320770
dbfs:/tmp/iot/part-00000-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78688-1-c000.snappy.parquetpart-00000-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78688-1-c000.snappy.parquet1701
dbfs:/tmp/iot/part-00001-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78689-1-c000.snappy.parquetpart-00001-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78689-1-c000.snappy.parquet1764
dbfs:/tmp/iot/part-00002-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78690-1-c000.snappy.parquetpart-00002-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78690-1-c000.snappy.parquet1714
dbfs:/tmp/iot/part-00003-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78691-1-c000.snappy.parquetpart-00003-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78691-1-c000.snappy.parquet1772
dbfs:/tmp/iot/part-00004-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78692-1-c000.snappy.parquetpart-00004-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78692-1-c000.snappy.parquet1709
dbfs:/tmp/iot/part-00005-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78693-1-c000.snappy.parquetpart-00005-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78693-1-c000.snappy.parquet1774
dbfs:/tmp/iot/part-00006-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78694-1-c000.snappy.parquetpart-00006-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78694-1-c000.snappy.parquet1720
dbfs:/tmp/iot/part-00007-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78695-1-c000.snappy.parquetpart-00007-tid-7472410303440732077-1b5f39de-6616-4c21-b825-003bb8a2179f-78695-1-c000.snappy.parquet1791

Now let's verify what was saved—devices as each indivdual strings encoded from above—are actual strings.

val parquetDF = spark.read.parquet("/tmp/iot")
parquetDF: org.apache.spark.sql.DataFrame = [devices: string]

Let's check the schema to ensure what was written is not different from what is read, namely the JSON string.

parquetDF.printSchema
root |-- devices: string (nullable = true)
display(parquetDF)
{"id":12,"device":"{\"device_id\": 12, \"device_type\": \"sensor-igauge\", \"ip\": \"193.156.90.200\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 18, \"signal\": 26, \"battery_level\": 8, \"c02_level\": 1220, \"timestamp\" :1475600522 }"}
{"id":13,"device":"{\"device_id\": 13, \"device_type\": \"sensor-ipad\", \"ip\": \"67.185.72.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 34, \"signal\": 20, \"battery_level\": 8, \"c02_level\": 1504, \"timestamp\" :1475600524 }"}
{"id":14,"device":"{\"device_id\": 14, \"device_type\": \"sensor-inest\", \"ip\": \"68.85.85.106\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 39, \"signal\": 17, \"battery_level\": 8, \"c02_level\": 831, \"timestamp\" :1475600526 }"}
{"id":2,"device":"{\"device_id\": 2, \"device_type\": \"sensor-ipad\", \"ip\": \"88.36.5.1\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 18, \"signal\": 25, \"battery_level\": 5, \"c02_level\": 1372, \"timestamp\" :1475600500 }"}
{"id":3,"device":"{\"device_id\": 3, \"device_type\": \"sensor-inest\", \"ip\": \"66.39.173.154\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 47, \"signal\": 12, \"battery_level\": 1, \"c02_level\": 1447, \"timestamp\" :1475600502 }"}
{"id":4,"device":"{\"device_id\": 4, \"device_type\": \"sensor-ipad\", \"ip\": \"203.82.41.9\", \"cca3\": \"PHL\", \"cn\": \"Philippines\", \"temp\": 29, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 983, \"timestamp\" :1475600504 }"}
{"id":15,"device":"{\"device_id\": 15, \"device_type\": \"sensor-ipad\", \"ip\": \"161.188.212.254\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 27, \"signal\": 26, \"battery_level\": 5, \"c02_level\": 1378, \"timestamp\" :1475600528 }"}
{"id":16,"device":"{\"device_id\": 16, \"device_type\": \"sensor-igauge\", \"ip\": \"221.3.128.242\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 10, \"signal\": 24, \"battery_level\": 6, \"c02_level\": 1423, \"timestamp\" :1475600530 }"}
{"id":0,"device":"{\"device_id\": 0, \"device_type\": \"sensor-ipad\", \"ip\": \"68.161.225.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 25, \"signal\": 23, \"battery_level\": 8, \"c02_level\": 917, \"timestamp\" :1475600496 }"}
{"id":1,"device":"{\"device_id\": 1, \"device_type\": \"sensor-igauge\", \"ip\": \"213.161.254.1\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 30, \"signal\": 18, \"battery_level\": 6, \"c02_level\": 1413, \"timestamp\" :1475600498 }"}
{"id":17,"device":"{\"device_id\": 17, \"device_type\": \"sensor-ipad\", \"ip\": \"64.124.180.215\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 38, \"signal\": 17, \"battery_level\": 9, \"c02_level\": 1304, \"timestamp\" :1475600532 }"}
{"id":18,"device":"{\"device_id\": 18, \"device_type\": \"sensor-igauge\", \"ip\": \"66.153.162.66\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 26, \"signal\": 10, \"battery_level\": 0, \"c02_level\": 902, \"timestamp\" :1475600534 }"}
{"id":19,"device":"{\"device_id\": 19, \"device_type\": \"sensor-ipad\", \"ip\": \"193.200.142.254\", \"cca3\": \"AUT\", \"cn\": \"Austria\", \"temp\": 32, \"signal\": 27, \"battery_level\": 5, \"c02_level\": 1282, \"timestamp\" :1475600536 }"}
{"id":7,"device":"{\"device_id\": 7, \"device_type\": \"sensor-ipad\", \"ip\": \"118.23.68.227\", \"cca3\": \"JPN\", \"cn\": \"Japan\", \"temp\": 27, \"signal\": 15, \"battery_level\": 0, \"c02_level\": 1531, \"timestamp\" :1475600512 }"}
{"id":8,"device":" {\"device_id\": 8, \"device_type\": \"sensor-inest\", \"ip\": \"208.109.163.218\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 40, \"signal\": 16, \"battery_level\": 9, \"c02_level\": 1208, \"timestamp\" :1475600514 }"}
{"id":9,"device":"{\"device_id\": 9, \"device_type\": \"sensor-ipad\", \"ip\": \"88.213.191.34\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 19, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 1171, \"timestamp\" :1475600516 }"}
{"id":5,"device":"{\"device_id\": 5, \"device_type\": \"sensor-istick\", \"ip\": \"204.116.105.67\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 50, \"signal\": 16, \"battery_level\": 8, \"c02_level\": 1574, \"timestamp\" :1475600506 }"}
{"id":6,"device":"{\"device_id\": 6, \"device_type\": \"sensor-ipad\", \"ip\": \"220.173.179.1\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 21, \"signal\": 18, \"battery_level\": 9, \"c02_level\": 1249, \"timestamp\" :1475600508 }"}
{"id":10,"device":"{\"device_id\": 10, \"device_type\": \"sensor-igauge\", \"ip\": \"68.28.91.22\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 32, \"signal\": 26, \"battery_level\": 7, \"c02_level\": 886, \"timestamp\" :1475600518 }"}
{"id":11,"device":"{\"device_id\": 11, \"device_type\": \"sensor-ipad\", \"ip\": \"59.144.114.250\", \"cca3\": \"IND\", \"cn\": \"India\", \"temp\": 46, \"signal\": 25, \"battery_level\": 4, \"c02_level\": 863, \"timestamp\" :1475600520 }"}

So far this tutorial has explored ways to use get_json_object(), from_json(), to_json(), selectExpr(), and explode() helper functions handling less complex JSON objects.

Let's turn focus to a more nested structures and examine how these same APIs as applied to a complex JSON as simple one.

Nested Structures

It's not unreasonable to assume that your JSON nested structures may have Maps as well as nested JSON. For illustration, let's use a single string comprised of complex and nested data types, including a Map. In a real life scenario, this could be a reading from a device event, with dangerous levels of C02 emissions or high temperature readings, that needs Network Operation Center (NOC) notification for some immediate action.

import org.apache.spark.sql.types._

val schema = new StructType()
  .add("dc_id", StringType)                               // data center where data was posted to Kafka cluster
  .add("source",                                          // info about the source of alarm
    MapType(                                              // define this as a Map(Key->value)
      StringType,
      new StructType()
      .add("description", StringType)
      .add("ip", StringType)
      .add("id", LongType)
      .add("temp", LongType)
      .add("c02_level", LongType)
      .add("geo", 
         new StructType()
          .add("lat", DoubleType)
          .add("long", DoubleType)
        )
      )
    )
import org.apache.spark.sql.types._ schema: org.apache.spark.sql.types.StructType = StructType(StructField(dc_id,StringType,true), StructField(source,MapType(StringType,StructType(StructField(description,StringType,true), StructField(ip,StringType,true), StructField(id,LongType,true), StructField(temp,LongType,true), StructField(c02_level,LongType,true), StructField(geo,StructType(StructField(lat,DoubleType,true), StructField(long,DoubleType,true)),true)),true),true))

Let's create a single complex JSON with complex data types.

//create a single entry with id and its complex and nested data types

val dataDS = Seq("""
{
"dc_id": "dc-101",
"source": {
    "sensor-igauge": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo": {"lat":38.00, "long":97.00}                        
    },
    "sensor-ipad": {
      "id": 13,
      "ip": "67.185.72.1",
      "description": "Sensor ipad attached to carbon cylinders",
      "temp": 34,
      "c02_level": 1370,
      "geo": {"lat":47.41, "long":-122.00}
    },
    "sensor-inest": {
      "id": 8,
      "ip": "208.109.163.218",
      "description": "Sensor attached to the factory ceilings",
      "temp": 40,
      "c02_level": 1346,
      "geo": {"lat":33.61, "long":-111.89}
    },
    "sensor-istick": {
      "id": 5,
      "ip": "204.116.105.67",
      "description": "Sensor embedded in exhaust pipes in the ceilings",
      "temp": 40,
      "c02_level": 1574,
      "geo": {"lat":35.93, "long":-85.46}
    }
  }
}""").toDS()
// should only be one item
dataDS.count()
dataDS: org.apache.spark.sql.Dataset[String] = [value: string] res25: Long = 1
display(dataDS)
{ "dc_id": "dc-101", "source": { "sensor-igauge": { "id": 10, "ip": "68.28.91.22", "description": "Sensor attached to the container ceilings", "temp":35, "c02_level": 1475, "geo": {"lat":38.00, "long":97.00} }, "sensor-ipad": { "id": 13, "ip": "67.185.72.1", "description": "Sensor ipad attached to carbon cylinders", "temp": 34, "c02_level": 1370, "geo": {"lat":47.41, "long":-122.00} }, "sensor-inest": { "id": 8, "ip": "208.109.163.218", "description": "Sensor attached to the factory ceilings", "temp": 40, "c02_level": 1346, "geo": {"lat":33.61, "long":-111.89} }, "sensor-istick": { "id": 5, "ip": "204.116.105.67", "description": "Sensor embedded in exhaust pipes in the ceilings", "temp": 40, "c02_level": 1574, "geo": {"lat":35.93, "long":-85.46} } } }

Let's process it. Note that we have nested structure geo.

val df = spark                  // spark session 
.read                           // get DataFrameReader
.schema(schema)                 // use the defined schema above and read format as JSON
.json(dataDS.rdd)               // RDD[String]
command-3901575101282734:4: warning: method json in class DataFrameReader is deprecated (since 2.2.0): Use json(Dataset[String]) instead. .json(dataDS.rdd) // RDD[String] ^ df: org.apache.spark.sql.DataFrame = [dc_id: string, source: map<string,struct<description:string,ip:string,id:bigint,temp:bigint,c02_level:bigint,geo:struct<lat:double,long:double>>>]

Let's examine its nested and complex schema.

df.printSchema
root |-- dc_id: string (nullable = true) |-- source: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- description: string (nullable = true) | | |-- ip: string (nullable = true) | | |-- id: long (nullable = true) | | |-- temp: long (nullable = true) | | |-- c02_level: long (nullable = true) | | |-- geo: struct (nullable = true) | | | |-- lat: double (nullable = true) | | | |-- long: double (nullable = true)
display(df)
dc-101{"sensor-igauge":{"description":"Sensor attached to the container ceilings","ip":"68.28.91.22","id":10,"temp":35,"c02_level":1475,"geo":{"lat":38,"long":97}},"sensor-ipad":{"description":"Sensor ipad attached to carbon cylinders","ip":"67.185.72.1","id":13,"temp":34,"c02_level":1370,"geo":{"lat":47.41,"long":-122}},"sensor-inest":{"description":"Sensor attached to the factory ceilings","ip":"208.109.163.218","id":8,"temp":40,"c02_level":1346,"geo":{"lat":33.61,"long":-111.89}},"sensor-istick":{"description":"Sensor embedded in exhaust pipes in the ceilings","ip":"204.116.105.67","id":5,"temp":40,"c02_level":1574,"geo":{"lat":35.93,"long":-85.46}}}

How to use explode()

The explode() function is used to show how to extract nested structures. Plus, it sheds more light when we see how it works alongside to_json() and from_json() functions, when extracting attributes and values from complex JSON structures. So on occasion, you will want to use explode(), alongside to_json() and from_json() functions. And here's one case where we do.

The explode() function creates a new row for each element in the given map column. In this case, the map column is source. Note that for each key-value in the map, you have a respective Row, in this case four.

// select from DataFrame with a single entry, and explode its column source, which is Map, with nested structure.
val explodedDF = df.select($"dc_id", explode($"source"))
display(explodedDF)
dc-101sensor-igauge{"description":"Sensor attached to the container ceilings","ip":"68.28.91.22","id":10,"temp":35,"c02_level":1475,"geo":{"lat":38,"long":97}}
dc-101sensor-ipad{"description":"Sensor ipad attached to carbon cylinders","ip":"67.185.72.1","id":13,"temp":34,"c02_level":1370,"geo":{"lat":47.41,"long":-122}}
dc-101sensor-inest{"description":"Sensor attached to the factory ceilings","ip":"208.109.163.218","id":8,"temp":40,"c02_level":1346,"geo":{"lat":33.61,"long":-111.89}}
dc-101sensor-istick{"description":"Sensor embedded in exhaust pipes in the ceilings","ip":"204.116.105.67","id":5,"temp":40,"c02_level":1574,"geo":{"lat":35.93,"long":-85.46}}

When you look at the schema, notice that source now has been expanded.

explodedDF.printSchema
root |-- dc_id: string (nullable = true) |-- key: string (nullable = false) |-- value: struct (nullable = true) | |-- description: string (nullable = true) | |-- ip: string (nullable = true) | |-- id: long (nullable = true) | |-- temp: long (nullable = true) | |-- c02_level: long (nullable = true) | |-- geo: struct (nullable = true) | | |-- lat: double (nullable = true) | | |-- long: double (nullable = true)

A single string aggregated with complex data types, including a Map. This could be a recording that needs Network Operation Center (NOC) attention for action, since both the temperature and C02 levels are alarming.

Let's access the data from our exploded data using Map.

//case class to denote our desired Scala object
case class DeviceAlert(dcId: String, deviceType:String, ip:String, deviceId:Long, temp:Long, c02_level: Long, lat: Double, lon: Double)
//access all values using getItem() method on value, by providing the "key," which is attribute in our JSON object.
val notifydevicesDS = explodedDF.select( $"dc_id" as "dcId",
                        $"key" as "deviceType",
                        'value.getItem("ip") as 'ip,
                        'value.getItem("id") as 'deviceId,
                        'value.getItem("c02_level") as 'c02_level,
                        'value.getItem("temp") as 'temp,
                        'value.getItem("geo").getItem("lat") as 'lat,                //note embedded level requires yet another level of fetching.
                        'value.getItem("geo").getItem("long") as 'lon)
                        .as[DeviceAlert]  // return as a Dataset
defined class DeviceAlert notifydevicesDS: org.apache.spark.sql.Dataset[DeviceAlert] = [dcId: string, deviceType: string ... 6 more fields]
notifydevicesDS.printSchema
root |-- dcId: string (nullable = true) |-- deviceType: string (nullable = false) |-- ip: string (nullable = true) |-- deviceId: long (nullable = true) |-- c02_level: long (nullable = true) |-- temp: long (nullable = true) |-- lat: double (nullable = true) |-- lon: double (nullable = true)
display(notifydevicesDS)
sensor-igaugesensor-ipadsensor-inestsensor-istick100%0%0%0%0%100%0%0%0%0%100%0%0%0%0%100%sensor-igaugesensor-ipadsensor-inestsensor-istick1475, 351370, 341346, 401574, 40

Suppose as part of our ETL, you have a need to notify or send alerts based on certain alarming conditions. One way is to write a user functtion at iterates over your filtered dataset and sends individual notifications. In other cases you can send the message to Kafka topic as an additional option.

Once you have exploded your nested JSON into a simple case class, we can send alerts to a NOC for action. On way to this is using a foreach() DataFrame method. But to do that we need a high-level function; given a case class it can extract its alarming attributes dispurse an alert. Although this simple example writes to stdout, in a real scenario, you will want to send alerts via SNMP or HTTP POST or some API to a PagerAlert.

Function for alert notifications

// define a Scala Notification Object
object DeviceNOCAlerts {

  def sendTwilio(message: String): Unit = {
    //TODO: fill as necessary
    println("Twilio:" + message)
  }

  def sendSNMP(message: String): Unit = {
    //TODO: fill as necessary
    println("SNMP:" + message)
  }
  
  def sendKafka(message: String): Unit = {
    //TODO: fill as necessary
     println("KAFKA:" + message)
  }
}
def logAlerts(log: java.io.PrintStream = Console.out, deviceAlert: DeviceAlert, alert: String, notify: String ="twilio"): Unit = {
val message = "[***ALERT***: %s; data_center: %s, device_name: %s, temperature: %d; device_id: %d ; ip: %s ; c02: %d]" format(alert, deviceAlert.dcId, deviceAlert.deviceType,deviceAlert.temp, deviceAlert.deviceId, deviceAlert.ip, deviceAlert.c02_level)                                                                                                                                                                                                                                                            
  //default log to Stderr/Stdout
  log.println(message)
  // use an appropriate notification method
  val notifyFunc = notify match {
      case "twilio" => DeviceNOCAlerts.sendTwilio _
      case "snmp" => DeviceNOCAlerts.sendSNMP _
      case "kafka" => DeviceNOCAlerts.sendKafka _
  }
  //send the appropriate alert
  notifyFunc(message)
}
defined object DeviceNOCAlerts logAlerts: (log: java.io.PrintStream, deviceAlert: DeviceAlert, alert: String, notify: String)Unit

Iterate over alert devices and take action

notifydevicesDS.foreach(d => logAlerts(Console.err, d, "ACTION NEED! HIGH TEPERATURE AND C02 LEVLES", "kafka"))

To view where the messages are logged, go to the Clusters-->Spark-->Logs.

Send alerts as JSON to Apache Kafka topic

What if you wanted to write these Devices' alerts to a Kafka topic on which a monitoring subscriber is awaiting for events to take action.

Here's a simple way to nofity listeners on your Kafka topic: "device_alerts." To read further how to use Structured Streaming in detail with Apache Kafka, read our part 3 of the blog series on Structure Streaming.

Note: See yet another use of selectExpr() function we explored above.

// val deviceAlertQuery = notifydevicesDS
//                       .selectExpr("CAST(dcId AS STRING) AS key", "to_json(struct(*)) AS value")
//                       .writeStream
//                       .format("kafka")
//                       .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
//                       .option("toipic", "device_alerts")
//                       .start()
                       

Nest Device Data

Let's look at another complex real-life data from Nest's readings. A Nest devices emits many JSON events to its collector. That collector could be at a nearby data center, a neighborhood-central data collector or an aggregator, or it could be a device installed at home, which on regular intervals sends device readings to a central data center connected via a secured internet connection. For illusration, I have curbed some of the attributes, but it still shows how complex data can be processed—and relevant attributes extracted.

Let's define its complicated schema first. At close observation, you will notice it's not dissimilar to the schema we defined above, except it has not one map but three maps: thermostats, cameras, and smoke alarms.

import org.apache.spark.sql.types._

// a bit longish, nested, and convoluted JSON schema :)
val nestSchema2 = new StructType()
      .add("devices", 
        new StructType()
          .add("thermostats", MapType(StringType,
            new StructType()
              .add("device_id", StringType)
              .add("locale", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("where_name", StringType)
              .add("last_connection", StringType)
              .add("is_online", BooleanType)
              .add("can_cool", BooleanType)
              .add("can_heat", BooleanType)
              .add("is_using_emergency_heat", BooleanType)
              .add("has_fan", BooleanType)
              .add("fan_timer_active", BooleanType)
              .add("fan_timer_timeout", StringType)
              .add("temperature_scale", StringType)
              .add("target_temperature_f", DoubleType)
              .add("target_temperature_high_f", DoubleType)
              .add("target_temperature_low_f", DoubleType)
              .add("eco_temperature_high_f", DoubleType)
              .add("eco_temperature_low_f", DoubleType)
              .add("away_temperature_high_f", DoubleType)
              .add("away_temperature_low_f", DoubleType)
              .add("hvac_mode", StringType)
              .add("humidity", LongType)
              .add("hvac_state", StringType)
              .add("is_locked", StringType)
              .add("locked_temp_min_f", DoubleType)
              .add("locked_temp_max_f", DoubleType)))
           .add("smoke_co_alarms", MapType(StringType,
             new StructType()
             .add("device_id", StringType)
             .add("locale", StringType)
             .add("software_version", StringType)
             .add("structure_id", StringType)
             .add("where_name", StringType)
             .add("last_connection", StringType)
             .add("is_online", BooleanType)
             .add("battery_health", StringType)
             .add("co_alarm_state", StringType)
             .add("smoke_alarm_state", StringType)
             .add("is_manual_test_active", BooleanType)
             .add("last_manual_test_time", StringType)
             .add("ui_color_state", StringType)))
           .add("cameras", MapType(StringType, 
               new StructType()
                .add("device_id", StringType)
                .add("software_version", StringType)
                .add("structure_id", StringType)
                .add("where_name", StringType)
                .add("is_online", BooleanType)
                .add("is_streaming", BooleanType)
                .add("is_audio_input_enabled", BooleanType)
                .add("last_is_online_change", StringType)
                .add("is_video_history_enabled", BooleanType)
                .add("web_url", StringType)
                .add("app_url", StringType)
                .add("is_public_share_enabled", BooleanType)
                .add("activity_zones",
                  new StructType()
                    .add("name", StringType)
                    .add("id", LongType))
                .add("last_event", StringType))))
import org.apache.spark.sql.types._ nestSchema2: org.apache.spark.sql.types.StructType = StructType(StructField(devices,StructType(StructField(thermostats,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(can_cool,BooleanType,true), StructField(can_heat,BooleanType,true), StructField(is_using_emergency_heat,BooleanType,true), StructField(has_fan,BooleanType,true), StructField(fan_timer_active,BooleanType,true), StructField(fan_timer_timeout,StringType,true), StructField(temperature_scale,StringType,true), StructField(target_temperature_f,DoubleType,true), StructField(target_temperature_high_f,DoubleType,true), StructField(target_temperature_low_f,DoubleType,true), StructField(eco_temperature_high_f,DoubleType,true), StructField(eco_temperature_low_f,DoubleType,true), StructField(away_temperature_high_f,DoubleType,true), StructField(away_temperature_low_f,DoubleType,true), StructField(hvac_mode,StringType,true), StructField(humidity,LongType,true), StructField(hvac_state,StringType,true), StructField(is_locked,StringType,true), StructField(locked_temp_min_f,DoubleType,true), StructField(locked_temp_max_f,DoubleType,true)),true),true), StructField(smoke_co_alarms,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(battery_health,StringType,true), StructField(co_alarm_state,StringType,true), StructField(smoke_alarm_state,StringType,true), StructField(is_manual_test_active,BooleanType,true), StructField(last_manual_test_time,StringType,true), StructField(ui_color_state,StringType,true)),true),true), StructField(cameras,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(is_online,BooleanType,true), StructField(is_streaming,BooleanType,true), StructField(is_audio_input_enabled,BooleanType,true), StructField(last_is_online_change,StringType,true), StructField(is_video_history_enabled,BooleanType,true), StructField(web_url,StringType,true), StructField(app_url,StringType,true), StructField(is_public_share_enabled,BooleanType,true), StructField(activity_zones,StructType(StructField(name,StringType,true), StructField(id,LongType,true)),true), StructField(last_event,StringType,true)),true),true)),true))

By creating a simple Dataset, you can then use all Dataset methods to do ETL, using utility functions from above: from_json(), to_json(), explode() and selectExpr().

val nestDataDS2 = Seq("""{
    "devices": {
       "thermostats": {
          "peyiJNo0IldT2YlIVtYaGQ": {
            "device_id": "peyiJNo0IldT2YlIVtYaGQ",
            "locale": "en-US",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Hallway Upstairs",
            "last_connection": "2016-10-31T23:59:59.000Z",
            "is_online": true,
            "can_cool": true,
            "can_heat": true,
            "is_using_emergency_heat": true,
            "has_fan": true,
            "fan_timer_active": true,
            "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
            "temperature_scale": "F",
            "target_temperature_f": 72,
            "target_temperature_high_f": 80,
            "target_temperature_low_f": 65,
            "eco_temperature_high_f": 80,
            "eco_temperature_low_f": 65,
            "away_temperature_high_f": 80,
            "away_temperature_low_f": 65,
            "hvac_mode": "heat",
            "humidity": 40,
            "hvac_state": "heating",
            "is_locked": true,
            "locked_temp_min_f": 65,
            "locked_temp_max_f": 80
            }
          },
          "smoke_co_alarms": {
            "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
              "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
              "locale": "en-US",
              "software_version": "1.01",
              "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
              "where_name": "Jane's Room",
              "last_connection": "2016-10-31T23:59:59.000Z",
              "is_online": true,
              "battery_health": "ok",
              "co_alarm_state": "ok",
              "smoke_alarm_state": "ok",
              "is_manual_test_active": true,
              "last_manual_test_time": "2016-10-31T23:59:59.000Z",
              "ui_color_state": "gray"
              }
            },
         "cameras": {
          "awJo6rH0IldT2YlIVtYaGQ": {
            "device_id": "awJo6rH",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Foyer",
            "is_online": true,
            "is_streaming": true,
            "is_audio_input_enabled": true,
            "last_is_online_change": "2016-12-29T18:42:00.000Z",
            "is_video_history_enabled": true,
            "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
            "app_url": "nestmobile://cameras/device_id?auth=access_token",
            "is_public_share_enabled": true,
            "activity_zones": { "name": "Walkway", "id": 244083 },
            "last_event": "2016-10-31T23:59:59.000Z"
            }
          }
        }
       }""").toDS
nestDataDS2: org.apache.spark.sql.Dataset[String] = [value: string]

Let's create a DataFrame from this single nested structure and use all the above utility functions to process and extract relevant attributes

val nestDF2 = spark                            // spark session 
            .read                             //  get DataFrameReader
            .schema(nestSchema2)             //  use the defined schema above and read format as JSON
            .json(nestDataDS2.rdd)
command-3901575101282763:4: warning: method json in class DataFrameReader is deprecated (since 2.2.0): Use json(Dataset[String]) instead. .json(nestDataDS2.rdd) ^ nestDF2: org.apache.spark.sql.DataFrame = [devices: struct<thermostats: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,can_cool:boolean,can_heat:boolean,is_using_emergency_heat:boolean,has_fan:boolean,fan_timer_active:boolean,fan_timer_timeout:string,temperature_scale:string,target_temperature_f:double,target_temperature_high_f:double,target_temperature_low_f:double,eco_temperature_high_f:double,eco_temperature_low_f:double,away_temperature_high_f:double,away_temperature_low_f:double,hvac_mode:string,humidity:bigint,hvac_state:string,... 3 more fields>>, smoke_co_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>> ... 1 more field>]
display(nestDF2)
{"thermostats":{"peyiJNo0IldT2YlIVtYaGQ":{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72,"target_temperature_high_f":80,"target_temperature_low_f":65,"eco_temperature_high_f":80,"eco_temperature_low_f":65,"away_temperature_high_f":80,"away_temperature_low_f":65,"hvac_mode":"heat","humidity":40,"hvac_state":"heating","is_locked":"true","locked_temp_min_f":65,"locked_temp_max_f":80}},"smoke_co_alarms":{"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{"device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale":"en-US","software_version":"1.01","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Jane's Room","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"battery_health":"ok","co_alarm_state":"ok","smoke_alarm_state":"ok","is_manual_test_active":true,"last_manual_test_time":"2016-10-31T23:59:59.000Z","ui_color_state":"gray"}},"cameras":{"awJo6rH0IldT2YlIVtYaGQ":{"device_id":"awJo6rH","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Foyer","is_online":true,"is_streaming":true,"is_audio_input_enabled":true,"last_is_online_change":"2016-12-29T18:42:00.000Z","is_video_history_enabled":true,"web_url":"https://home.nest.com/cameras/device_id?auth=access_token","app_url":"nestmobile://cameras/device_id?auth=access_token","is_public_share_enabled":true,"activity_zones":{"name":"Walkway","id":244083},"last_event":"2016-10-31T23:59:59.000Z"}}}

Converting the entire JSON object above into a JSON string as above.

val stringJsonDF = nestDF2.select(to_json(struct($"*"))).toDF("nestDevice")
stringJsonDF: org.apache.spark.sql.DataFrame = [nestDevice: string]
stringJsonDF.printSchema
root |-- nestDevice: string (nullable = true)
display(stringJsonDF)
{"devices":{"thermostats":{"peyiJNo0IldT2YlIVtYaGQ":{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72.0,"target_temperature_high_f":80.0,"target_temperature_low_f":65.0,"eco_temperature_high_f":80.0,"eco_temperature_low_f":65.0,"away_temperature_high_f":80.0,"away_temperature_low_f":65.0,"hvac_mode":"heat","humidity":40,"hvac_state":"heating","is_locked":"true","locked_temp_min_f":65.0,"locked_temp_max_f":80.0}},"smoke_co_alarms":{"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{"device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale":"en-US","software_version":"1.01","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7...

Given the nested JSON object with three maps, you can get fetch individual map as a columnn, and then access attributes from it using explode().

val mapColumnsDF = nestDF2.select($"devices".getItem("smoke_co_alarms").alias ("smoke_alarms"),
                                  $"devices".getItem("cameras").alias ("cameras"),
                                  $"devices".getItem("thermostats").alias ("thermostats"))
mapColumnsDF: org.apache.spark.sql.DataFrame = [smoke_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>>, cameras: map<string,struct<device_id:string,software_version:string,structure_id:string,where_name:string,is_online:boolean,is_streaming:boolean,is_audio_input_enabled:boolean,last_is_online_change:string,is_video_history_enabled:boolean,web_url:string,app_url:string,is_public_share_enabled:boolean,activity_zones:struct<name:string,id:bigint>,last_event:string>> ... 1 more field]
display(mapColumnsDF)
{"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs":{"device_id":"RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs","locale":"en-US","software_version":"1.01","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Jane's Room","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"battery_health":"ok","co_alarm_state":"ok","smoke_alarm_state":"ok","is_manual_test_active":true,"last_manual_test_time":"2016-10-31T23:59:59.000Z","ui_color_state":"gray"}}{"awJo6rH0IldT2YlIVtYaGQ":{"device_id":"awJo6rH","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Foyer","is_online":true,"is_streaming":true,"is_audio_input_enabled":true,"last_is_online_change":"2016-12-29T18:42:00.000Z","is_video_history_enabled":true,"web_url":"https://home.nest.com/cameras/device_id?auth=access_token","app_url":"nestmobile://cameras/device_id?auth=access_token","is_public_share_enabled":true,"activity_zones":{"name":"Walkway","id":244083},"last_event":"2016-10-31T23:59:59.000Z"}}{"peyiJNo0IldT2YlIVtYaGQ":{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72,"target_temperature_high_f":80,"target_temperature_low_f":65,"eco_temperature_high_f":80,"eco_temperature_low_f":65,"away_temperature_high_f":80,"away_temperature_low_f":65,"hvac_mode":"heat","humidity":40,"hvac_state":"heating","is_locked":"true","locked_temp_min_f":65,"locked_temp_max_f":80}}
val explodedThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
val explodedCamerasDF = mapColumnsDF.select(explode($"cameras"))
//or you could use the original nestDF2 and use the devices.X notation
val explodedSmokedAlarmsDF =  nestDF2.select(explode($"devices.smoke_co_alarms"))
explodedThermostatsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 25 more fields>] explodedCamerasDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, software_version: string ... 12 more fields>] explodedSmokedAlarmsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 11 more fields>]
display(explodedThermostatsDF)
peyiJNo0IldT2YlIVtYaGQ{"device_id":"peyiJNo0IldT2YlIVtYaGQ","locale":"en-US","software_version":"4.0","structure_id":"VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw","where_name":"Hallway Upstairs","last_connection":"2016-10-31T23:59:59.000Z","is_online":true,"can_cool":true,"can_heat":true,"is_using_emergency_heat":true,"has_fan":true,"fan_timer_active":true,"fan_timer_timeout":"2016-10-31T23:59:59.000Z","temperature_scale":"F","target_temperature_f":72,"target_temperature_high_f":80,"target_temperature_low_f":65,"eco_temperature_high_f":80,"eco_temperature_low_f":65,"away_temperature_high_f":80,"away_temperature_low_f":65,"hvac_mode":"heat","humidity":40,"hvac_state":"heating","is_locked":"true","locked_temp_min_f":65,"locked_temp_max_f":80}

To extract specific individual fields from map, you can use the getItem() method.

val thermostateDF = explodedThermostatsDF.select($"value".getItem("device_id").alias("device_id"), 
                                                 $"value".getItem("locale").alias("locale"),
                                                 $"value".getItem("where_name").alias("location"),
                                                 $"value".getItem("last_connection").alias("last_connected"),
                                                 $"value".getItem("humidity").alias("humidity"),
                                                 $"value".getItem("target_temperature_f").alias("target_temperature_f"),
                                                 $"value".getItem("hvac_mode").alias("mode"),
                                                 $"value".getItem("software_version").alias("version"))

val cameraDF = explodedCamerasDF.select($"value".getItem("device_id").alias("device_id"),
                                        $"value".getItem("where_name").alias("location"),
                                        $"value".getItem("software_version").alias("version"),
                                        $"value".getItem("activity_zones").getItem("name").alias("name"),
                                        $"value".getItem("activity_zones").getItem("id").alias("id"))
                                         
val smokedAlarmsDF = explodedSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
                                                   $"value".getItem("where_name").alias("location"),
                                                   $"value".getItem("software_version").alias("version"),
                                                   $"value".getItem("last_connection").alias("last_connected"),
                                                   $"value".getItem("battery_health").alias("battery_health"))
                                                  
                                        
thermostateDF: org.apache.spark.sql.DataFrame = [device_id: string, locale: string ... 6 more fields] cameraDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields] smokedAlarmsDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields]
display(thermostateDF)
peyiJNo0IldT2YlIVtYaGQen-USHallway Upstairs2016-10-31T23:59:59.000Z4072heat4.0
display(cameraDF)
awJo6rHFoyer4.0Walkway244083
display(smokedAlarmsDF)
RTMTKxsQTCxzVcsySOHPxKoF4OyCifrsJane's Room1.012016-10-31T23:59:59.000Zok

Let's join two DataFrames over column version.

val joineDFs = thermostateDF.join(cameraDF, "version")

joineDFs: org.apache.spark.sql.DataFrame = [version: string, device_id: string ... 10 more fields]
display(joineDFs)
4.0peyiJNo0IldT2YlIVtYaGQen-USHallway Upstairs2016-10-31T23:59:59.000Z4072heatawJo6rHFoyerWalkway244083

Summary

The point of this short tutorial has been to demonstrate the easy use of utility functions to extract JSON attributes from a complex and nested structure. And once you have exploded or flattened or parsed the desired values into respective DataFrames or Datasets, you can as easily extract and query them as you would any DataFrame or Dataset, using respective APIs. Check our Structured Streaming series part 3, where we show how you can read Nest device logs from Apache Kafka and do some ETL on them.