complex-nested-structured(Scala)
Loading...

Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types

While this in-depth blog lays out and explains the concepts and motivations for processing and handling complex data types and formats, this notebook example examines how you can apply them, with a few concrete examples, for data types that you might encounter in your use cases. This short notebook tutorial shows ways in which you can explore and employ a number of new helper Spark SQL utility functions and APIs as part of org.apache.spark.sql.functions package. In particular, they come in handy while doing Streaming ETL, in which data are JSON objects with complex and nested structures: Map and Structs embedded as JSON:

  • get_json_object()
  • from_json()
  • to_json()
  • explode()
  • selectExpr()

The takeaway from this short tutorial is myriad ways to slice and dice nested JSON structures with Spark SQL utility functions.

Let's create a simple JSON schema with attributes and values, without any nested structures.

import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.functions._                     // include the Spark helper functions

val jsonSchema = new StructType()
        .add("battery_level", LongType)
        .add("c02_level", LongType)
        .add("cca3",StringType)
        .add("cn", StringType)
        .add("device_id", LongType)
        .add("device_type", StringType)
        .add("signal", LongType)
        .add("ip", StringType)
        .add("temp", LongType)
        .add("timestamp", TimestampType)
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(battery_level,LongType,true), StructField(c02_level,LongType,true), StructField(cca3,StringType,true), StructField(cn,StringType,true), StructField(device_id,LongType,true), StructField(device_type,StringType,true), StructField(signal,LongType,true), StructField(ip,StringType,true), StructField(temp,LongType,true), StructField(timestamp,TimestampType,true))

Using the schema above, create a Dataset, represented as a Scala case type, and generate some JSON data associated with it. In all likelihood, this JSON might as well be a stream of device events read off a Kafka topic. Note that the case class has two fields: integer (as a device id) and a string (as a JSON string representing device events).

Create a Dataset from the above schema
// define a case class
case class DeviceData (id: Int, device: String)
// create some sample data
val eventsDS = Seq (
 (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
 (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""),
(10,"""{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""),
(11,"""{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""),
(12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""),
(13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""),
(14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""),
(15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""),
(16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""),
(17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""),
(18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""),
(19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }""")).toDF("id", "device").as[DeviceData]
defined class DeviceData eventsDS: org.apache.spark.sql.Dataset[DeviceData] = [id: int, device: string]

Our Dataset is collection of Scala case classes, and when displayed as DataFrame, there are two columns (id, string)

display(eventsDS)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
10{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }
11{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }
12{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }
13{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }
14{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }
15{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }
16{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }
17{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }
18{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }
19{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }

Printing the schema atests to two columns of type integer and string, reflecting the Scala case class.

eventsDS.printSchema
root |-- id: integer (nullable = false) |-- device: string (nullable = true)

How to use get_json_object()

This method extracts a JSON object from a JSON string based on JSON path specified, and returns a JSON string as the extracted JSON object. Take the small example of the above dataset, from which we wish to extract portions of data values that make up the JSON object string. Say you want to extract only id, device_type, ip, and CCA3 code. Here's a quick way to do it using get_json_object().

val eventsFromJSONDF = Seq (
 (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
 (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
 (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
 (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
(4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
(5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
(6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
(7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
(8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
(9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }""")).toDF("id", "json")
eventsFromJSONDF: org.apache.spark.sql.DataFrame = [id: int, json: string]
display(eventsFromJSONDF)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
val jsDF = eventsFromJSONDF.select($"id", get_json_object($"json", "$.device_type").alias("device_type"),
                                          get_json_object($"json", "$.ip").alias("ip"),
                                         get_json_object($"json", "$.cca3").alias("cca3"))
jsDF: org.apache.spark.sql.DataFrame = [id: int, device_type: string ... 2 more fields]
display(jsDF)
0sensor-ipad68.161.225.1USA
1sensor-igauge213.161.254.1NOR
2sensor-ipad88.36.5.1ITA
3sensor-inest66.39.173.154USA
4sensor-ipad203.82.41.9PHL
5sensor-istick204.116.105.67USA
6sensor-ipad220.173.179.1CHN
7sensor-ipad118.23.68.227JPN
8sensor-inest208.109.163.218USA
9sensor-ipad88.213.191.34ITA

How to use from_json()

A variation of get_json_object(), this function uses schema to extract individual columns. Using from_json() helper function within the select() Dataset API call, I can extract or decode data's attributes and values from a JSON string into a DataFrame as columns, dictated by a schema. As well using the schema, I ascribe all associated atrributes and values within this JSON to represent as an entity devices. As such, not only can you use the device.attribute to retrieve its respective value but also all values using the * notation.

In example below:

  • Uses the schema above to extract from the JSON string attributes and values and represent them as individual columns as part of devices
  • select() all its columns
  • Filters on desired attributes using the . notation

Once you have extracted data from a JSON string into its respective DataFrame columns, you can apply DataFrame/Dataset APIs calls to select, filter, and subsequtly display, to your satisfaction.

val devicesDF = eventsDS.select(from_json($"device", jsonSchema) as "devices")
.select($"devices.*")
.filter($"devices.temp" > 10 and $"devices.signal" > 15)
devicesDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
display(devicesDF)
0.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.002040608010023182516262017270.00204060801002318251626201727TOOLTIPUSANORITACHNINDAUTsignaltempcca3USAUSANORNORITAITACHNCHNINDINDAUTAUT
val devicesUSDF = devicesDF.select($"*").where($"cca3" === "USA").orderBy($"signal".desc, $"temp".desc)
devicesUSDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [battery_level: bigint, c02_level: bigint ... 8 more fields]
display(devicesUSDF)
2623201716100%0%0%0%0%22%20%27%31%0%0%0%0%49%51%0%0%0%0%100%signal26262323202017171616sensor-igaugesensor-ipadsensor-inestsensor-istick

How to use to_json()

Now, let's do the reverse: you can convert or encode our filtered devices into JSON string using to_json(). That is, convert a JSON struct into a string. The result can be republished, for instance, to Kafka or saved on disk as parquet files. To learn how to write to Kafka and other sinks, read this blog and our series on Structured Streaming blogs.

val stringJsonDF = eventsDS.select(to_json(struct($"*"))).toDF("devices")
stringJsonDF: org.apache.spark.sql.DataFrame = [devices: string]
display(stringJsonDF)
{"id":0,"device":"{\"device_id\": 0, \"device_type\": \"sensor-ipad\", \"ip\": \"68.161.225.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 25, \"signal\": 23, \"battery_level\": 8, \"c02_level\": 917, \"timestamp\" :1475600496 }"}
{"id":1,"device":"{\"device_id\": 1, \"device_type\": \"sensor-igauge\", \"ip\": \"213.161.254.1\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 30, \"signal\": 18, \"battery_level\": 6, \"c02_level\": 1413, \"timestamp\" :1475600498 }"}
{"id":2,"device":"{\"device_id\": 2, \"device_type\": \"sensor-ipad\", \"ip\": \"88.36.5.1\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 18, \"signal\": 25, \"battery_level\": 5, \"c02_level\": 1372, \"timestamp\" :1475600500 }"}
{"id":3,"device":"{\"device_id\": 3, \"device_type\": \"sensor-inest\", \"ip\": \"66.39.173.154\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 47, \"signal\": 12, \"battery_level\": 1, \"c02_level\": 1447, \"timestamp\" :1475600502 }"}
{"id":4,"device":"{\"device_id\": 4, \"device_type\": \"sensor-ipad\", \"ip\": \"203.82.41.9\", \"cca3\": \"PHL\", \"cn\": \"Philippines\", \"temp\": 29, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 983, \"timestamp\" :1475600504 }"}
{"id":5,"device":"{\"device_id\": 5, \"device_type\": \"sensor-istick\", \"ip\": \"204.116.105.67\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 50, \"signal\": 16, \"battery_level\": 8, \"c02_level\": 1574, \"timestamp\" :1475600506 }"}
{"id":6,"device":"{\"device_id\": 6, \"device_type\": \"sensor-ipad\", \"ip\": \"220.173.179.1\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 21, \"signal\": 18, \"battery_level\": 9, \"c02_level\": 1249, \"timestamp\" :1475600508 }"}
{"id":7,"device":"{\"device_id\": 7, \"device_type\": \"sensor-ipad\", \"ip\": \"118.23.68.227\", \"cca3\": \"JPN\", \"cn\": \"Japan\", \"temp\": 27, \"signal\": 15, \"battery_level\": 0, \"c02_level\": 1531, \"timestamp\" :1475600512 }"}
{"id":8,"device":" {\"device_id\": 8, \"device_type\": \"sensor-inest\", \"ip\": \"208.109.163.218\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 40, \"signal\": 16, \"battery_level\": 9, \"c02_level\": 1208, \"timestamp\" :1475600514 }"}
{"id":9,"device":"{\"device_id\": 9, \"device_type\": \"sensor-ipad\", \"ip\": \"88.213.191.34\", \"cca3\": \"ITA\", \"cn\": \"Italy\", \"temp\": 19, \"signal\": 11, \"battery_level\": 0, \"c02_level\": 1171, \"timestamp\" :1475600516 }"}
{"id":10,"device":"{\"device_id\": 10, \"device_type\": \"sensor-igauge\", \"ip\": \"68.28.91.22\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 32, \"signal\": 26, \"battery_level\": 7, \"c02_level\": 886, \"timestamp\" :1475600518 }"}
{"id":11,"device":"{\"device_id\": 11, \"device_type\": \"sensor-ipad\", \"ip\": \"59.144.114.250\", \"cca3\": \"IND\", \"cn\": \"India\", \"temp\": 46, \"signal\": 25, \"battery_level\": 4, \"c02_level\": 863, \"timestamp\" :1475600520 }"}
{"id":12,"device":"{\"device_id\": 12, \"device_type\": \"sensor-igauge\", \"ip\": \"193.156.90.200\", \"cca3\": \"NOR\", \"cn\": \"Norway\", \"temp\": 18, \"signal\": 26, \"battery_level\": 8, \"c02_level\": 1220, \"timestamp\" :1475600522 }"}
{"id":13,"device":"{\"device_id\": 13, \"device_type\": \"sensor-ipad\", \"ip\": \"67.185.72.1\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 34, \"signal\": 20, \"battery_level\": 8, \"c02_level\": 1504, \"timestamp\" :1475600524 }"}
{"id":14,"device":"{\"device_id\": 14, \"device_type\": \"sensor-inest\", \"ip\": \"68.85.85.106\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 39, \"signal\": 17, \"battery_level\": 8, \"c02_level\": 831, \"timestamp\" :1475600526 }"}
{"id":15,"device":"{\"device_id\": 15, \"device_type\": \"sensor-ipad\", \"ip\": \"161.188.212.254\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 27, \"signal\": 26, \"battery_level\": 5, \"c02_level\": 1378, \"timestamp\" :1475600528 }"}
{"id":16,"device":"{\"device_id\": 16, \"device_type\": \"sensor-igauge\", \"ip\": \"221.3.128.242\", \"cca3\": \"CHN\", \"cn\": \"China\", \"temp\": 10, \"signal\": 24, \"battery_level\": 6, \"c02_level\": 1423, \"timestamp\" :1475600530 }"}
{"id":17,"device":"{\"device_id\": 17, \"device_type\": \"sensor-ipad\", \"ip\": \"64.124.180.215\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 38, \"signal\": 17, \"battery_level\": 9, \"c02_level\": 1304, \"timestamp\" :1475600532 }"}
{"id":18,"device":"{\"device_id\": 18, \"device_type\": \"sensor-igauge\", \"ip\": \"66.153.162.66\", \"cca3\": \"USA\", \"cn\": \"United States\", \"temp\": 26, \"signal\": 10, \"battery_level\": 0, \"c02_level\": 902, \"timestamp\" :1475600534 }"}
{"id":19,"device":"{\"device_id\": 19, \"device_type\": \"sensor-ipad\", \"ip\": \"193.200.142.254\", \"cca3\": \"AUT\", \"cn\": \"Austria\", \"temp\": 32, \"signal\": 27, \"battery_level\": 5, \"c02_level\": 1282, \"timestamp\" :1475600536 }"}

Assuming you have a Kafka cluster running at specified port and the respective topic, let's write to Kafka topic.

// stringJsonDF.write
//            .format("kafka")
//            .option("kafka.bootstrap.servers", "your_host_name:9092")
//            .option("topic", "iot-devices")
//            .save()

How to use selectExpr()

Another way to convert or encode a column into a JSON object as string is to use the selectExpr() utility function. For instance, I can convert the "device" column of our DataFrame from above into a JSON String

val stringsDF = eventsDS.selectExpr("CAST(id AS INT)", "CAST(device AS STRING)")
stringsDF: org.apache.spark.sql.DataFrame = [id: int, device: string]
stringsDF.printSchema
root |-- id: integer (nullable = false) |-- device: string (nullable = true)
display(stringsDF)
0{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }
1{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }
2{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }
3{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }
4{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }
5{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }
6{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }
7{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }
8 {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }
9{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }
10{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }
11{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }
12{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }
13{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }
14{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }
15{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }
16{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }
17{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }
18{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }
19{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }