import org.apache.spark.sql.types._ // include the Spark Types to define our schema import org.apache.spark.sql.functions._ // include the Spark helper functions val jsonSchema = new StructType() .add("battery_level", LongType) .add("c02_level", LongType) .add("cca3",StringType) .add("cn", StringType) .add("device_id", LongType) .add("device_type", StringType) .add("signal", LongType) .add("ip", StringType) .add("temp", LongType) .add("timestamp", TimestampType)
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(battery_level,LongType,true), StructField(c02_level,LongType,true), StructField(cca3,StringType,true), StructField(cn,StringType,true), StructField(device_id,LongType,true), StructField(device_type,StringType,true), StructField(signal,LongType,true), StructField(ip,StringType,true), StructField(temp,LongType,true), StructField(timestamp,TimestampType,true))
// define a case class case class DeviceData (id: Int, device: String) // create some sample data val eventsDS = Seq ( (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""), (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""), (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""), (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""), (4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""), (5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""), (6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""), (7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""), (8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""), (9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""), (10,"""{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""), (11,"""{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""), (12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""), (13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""), (14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""), (15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""), (16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""), (17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""), (18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""), (19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }""")).toDF("id", "device").as[DeviceData]
defined class DeviceData
eventsDS: org.apache.spark.sql.Dataset[DeviceData] = [id: int, device: string]
val eventsFromJSONDF = Seq ( (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""), (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""), (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""), (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""), (4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""), (5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""), (6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""), (7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""), (8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""), (9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }""")).toDF("id", "json")
eventsFromJSONDF: org.apache.spark.sql.DataFrame = [id: int, json: string]
import org.apache.spark.sql.types._ val schema = new StructType() .add("dc_id", StringType) // data center where data was posted to Kafka cluster .add("source", // info about the source of alarm MapType( // define this as a Map(Key->value) StringType, new StructType() .add("description", StringType) .add("ip", StringType) .add("id", LongType) .add("temp", LongType) .add("c02_level", LongType) .add("geo", new StructType() .add("lat", DoubleType) .add("long", DoubleType) ) ) )
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(dc_id,StringType,true), StructField(source,MapType(StringType,StructType(StructField(description,StringType,true), StructField(ip,StringType,true), StructField(id,LongType,true), StructField(temp,LongType,true), StructField(c02_level,LongType,true), StructField(geo,StructType(StructField(lat,DoubleType,true), StructField(long,DoubleType,true)),true)),true),true))
//create a single entry with id and its complex and nested data types val dataDS = Seq(""" { "dc_id": "dc-101", "source": { "sensor-igauge": { "id": 10, "ip": "68.28.91.22", "description": "Sensor attached to the container ceilings", "temp":35, "c02_level": 1475, "geo": {"lat":38.00, "long":97.00} }, "sensor-ipad": { "id": 13, "ip": "67.185.72.1", "description": "Sensor ipad attached to carbon cylinders", "temp": 34, "c02_level": 1370, "geo": {"lat":47.41, "long":-122.00} }, "sensor-inest": { "id": 8, "ip": "208.109.163.218", "description": "Sensor attached to the factory ceilings", "temp": 40, "c02_level": 1346, "geo": {"lat":33.61, "long":-111.89} }, "sensor-istick": { "id": 5, "ip": "204.116.105.67", "description": "Sensor embedded in exhaust pipes in the ceilings", "temp": 40, "c02_level": 1574, "geo": {"lat":35.93, "long":-85.46} } } }""").toDS() // should only be one item dataDS.count()
dataDS: org.apache.spark.sql.Dataset[String] = [value: string]
res25: Long = 1
val df = spark // spark session .read // get DataFrameReader .schema(schema) // use the defined schema above and read format as JSON .json(dataDS.rdd) // RDD[String]
command-3901575101282734:4: warning: method json in class DataFrameReader is deprecated (since 2.2.0): Use json(Dataset[String]) instead.
.json(dataDS.rdd) // RDD[String]
^
df: org.apache.spark.sql.DataFrame = [dc_id: string, source: map<string,struct<description:string,ip:string,id:bigint,temp:bigint,c02_level:bigint,geo:struct<lat:double,long:double>>>]
df.printSchema
root
|-- dc_id: string (nullable = true)
|-- source: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- description: string (nullable = true)
| | |-- ip: string (nullable = true)
| | |-- id: long (nullable = true)
| | |-- temp: long (nullable = true)
| | |-- c02_level: long (nullable = true)
| | |-- geo: struct (nullable = true)
| | | |-- lat: double (nullable = true)
| | | |-- long: double (nullable = true)
explodedDF.printSchema
root
|-- dc_id: string (nullable = true)
|-- key: string (nullable = false)
|-- value: struct (nullable = true)
| |-- description: string (nullable = true)
| |-- ip: string (nullable = true)
| |-- id: long (nullable = true)
| |-- temp: long (nullable = true)
| |-- c02_level: long (nullable = true)
| |-- geo: struct (nullable = true)
| | |-- lat: double (nullable = true)
| | |-- long: double (nullable = true)
//case class to denote our desired Scala object case class DeviceAlert(dcId: String, deviceType:String, ip:String, deviceId:Long, temp:Long, c02_level: Long, lat: Double, lon: Double) //access all values using getItem() method on value, by providing the "key," which is attribute in our JSON object. val notifydevicesDS = explodedDF.select( $"dc_id" as "dcId", $"key" as "deviceType", 'value.getItem("ip") as 'ip, 'value.getItem("id") as 'deviceId, 'value.getItem("c02_level") as 'c02_level, 'value.getItem("temp") as 'temp, 'value.getItem("geo").getItem("lat") as 'lat, //note embedded level requires yet another level of fetching. 'value.getItem("geo").getItem("long") as 'lon) .as[DeviceAlert] // return as a Dataset
defined class DeviceAlert
notifydevicesDS: org.apache.spark.sql.Dataset[DeviceAlert] = [dcId: string, deviceType: string ... 6 more fields]
notifydevicesDS.printSchema
root
|-- dcId: string (nullable = true)
|-- deviceType: string (nullable = false)
|-- ip: string (nullable = true)
|-- deviceId: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- temp: long (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)
// define a Scala Notification Object object DeviceNOCAlerts { def sendTwilio(message: String): Unit = { //TODO: fill as necessary println("Twilio:" + message) } def sendSNMP(message: String): Unit = { //TODO: fill as necessary println("SNMP:" + message) } def sendKafka(message: String): Unit = { //TODO: fill as necessary println("KAFKA:" + message) } } def logAlerts(log: java.io.PrintStream = Console.out, deviceAlert: DeviceAlert, alert: String, notify: String ="twilio"): Unit = { val message = "[***ALERT***: %s; data_center: %s, device_name: %s, temperature: %d; device_id: %d ; ip: %s ; c02: %d]" format(alert, deviceAlert.dcId, deviceAlert.deviceType,deviceAlert.temp, deviceAlert.deviceId, deviceAlert.ip, deviceAlert.c02_level) //default log to Stderr/Stdout log.println(message) // use an appropriate notification method val notifyFunc = notify match { case "twilio" => DeviceNOCAlerts.sendTwilio _ case "snmp" => DeviceNOCAlerts.sendSNMP _ case "kafka" => DeviceNOCAlerts.sendKafka _ } //send the appropriate alert notifyFunc(message) }
defined object DeviceNOCAlerts
logAlerts: (log: java.io.PrintStream, deviceAlert: DeviceAlert, alert: String, notify: String)Unit
import org.apache.spark.sql.types._ // a bit longish, nested, and convoluted JSON schema :) val nestSchema2 = new StructType() .add("devices", new StructType() .add("thermostats", MapType(StringType, new StructType() .add("device_id", StringType) .add("locale", StringType) .add("software_version", StringType) .add("structure_id", StringType) .add("where_name", StringType) .add("last_connection", StringType) .add("is_online", BooleanType) .add("can_cool", BooleanType) .add("can_heat", BooleanType) .add("is_using_emergency_heat", BooleanType) .add("has_fan", BooleanType) .add("fan_timer_active", BooleanType) .add("fan_timer_timeout", StringType) .add("temperature_scale", StringType) .add("target_temperature_f", DoubleType) .add("target_temperature_high_f", DoubleType) .add("target_temperature_low_f", DoubleType) .add("eco_temperature_high_f", DoubleType) .add("eco_temperature_low_f", DoubleType) .add("away_temperature_high_f", DoubleType) .add("away_temperature_low_f", DoubleType) .add("hvac_mode", StringType) .add("humidity", LongType) .add("hvac_state", StringType) .add("is_locked", StringType) .add("locked_temp_min_f", DoubleType) .add("locked_temp_max_f", DoubleType))) .add("smoke_co_alarms", MapType(StringType, new StructType() .add("device_id", StringType) .add("locale", StringType) .add("software_version", StringType) .add("structure_id", StringType) .add("where_name", StringType) .add("last_connection", StringType) .add("is_online", BooleanType) .add("battery_health", StringType) .add("co_alarm_state", StringType) .add("smoke_alarm_state", StringType) .add("is_manual_test_active", BooleanType) .add("last_manual_test_time", StringType) .add("ui_color_state", StringType))) .add("cameras", MapType(StringType, new StructType() .add("device_id", StringType) .add("software_version", StringType) .add("structure_id", StringType) .add("where_name", StringType) .add("is_online", BooleanType) .add("is_streaming", BooleanType) .add("is_audio_input_enabled", BooleanType) .add("last_is_online_change", StringType) .add("is_video_history_enabled", BooleanType) .add("web_url", StringType) .add("app_url", StringType) .add("is_public_share_enabled", BooleanType) .add("activity_zones", new StructType() .add("name", StringType) .add("id", LongType)) .add("last_event", StringType))))
import org.apache.spark.sql.types._
nestSchema2: org.apache.spark.sql.types.StructType = StructType(StructField(devices,StructType(StructField(thermostats,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(can_cool,BooleanType,true), StructField(can_heat,BooleanType,true), StructField(is_using_emergency_heat,BooleanType,true), StructField(has_fan,BooleanType,true), StructField(fan_timer_active,BooleanType,true), StructField(fan_timer_timeout,StringType,true), StructField(temperature_scale,StringType,true), StructField(target_temperature_f,DoubleType,true), StructField(target_temperature_high_f,DoubleType,true), StructField(target_temperature_low_f,DoubleType,true), StructField(eco_temperature_high_f,DoubleType,true), StructField(eco_temperature_low_f,DoubleType,true), StructField(away_temperature_high_f,DoubleType,true), StructField(away_temperature_low_f,DoubleType,true), StructField(hvac_mode,StringType,true), StructField(humidity,LongType,true), StructField(hvac_state,StringType,true), StructField(is_locked,StringType,true), StructField(locked_temp_min_f,DoubleType,true), StructField(locked_temp_max_f,DoubleType,true)),true),true), StructField(smoke_co_alarms,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(locale,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(last_connection,StringType,true), StructField(is_online,BooleanType,true), StructField(battery_health,StringType,true), StructField(co_alarm_state,StringType,true), StructField(smoke_alarm_state,StringType,true), StructField(is_manual_test_active,BooleanType,true), StructField(last_manual_test_time,StringType,true), StructField(ui_color_state,StringType,true)),true),true), StructField(cameras,MapType(StringType,StructType(StructField(device_id,StringType,true), StructField(software_version,StringType,true), StructField(structure_id,StringType,true), StructField(where_name,StringType,true), StructField(is_online,BooleanType,true), StructField(is_streaming,BooleanType,true), StructField(is_audio_input_enabled,BooleanType,true), StructField(last_is_online_change,StringType,true), StructField(is_video_history_enabled,BooleanType,true), StructField(web_url,StringType,true), StructField(app_url,StringType,true), StructField(is_public_share_enabled,BooleanType,true), StructField(activity_zones,StructType(StructField(name,StringType,true), StructField(id,LongType,true)),true), StructField(last_event,StringType,true)),true),true)),true))
val nestDataDS2 = Seq("""{ "devices": { "thermostats": { "peyiJNo0IldT2YlIVtYaGQ": { "device_id": "peyiJNo0IldT2YlIVtYaGQ", "locale": "en-US", "software_version": "4.0", "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw", "where_name": "Hallway Upstairs", "last_connection": "2016-10-31T23:59:59.000Z", "is_online": true, "can_cool": true, "can_heat": true, "is_using_emergency_heat": true, "has_fan": true, "fan_timer_active": true, "fan_timer_timeout": "2016-10-31T23:59:59.000Z", "temperature_scale": "F", "target_temperature_f": 72, "target_temperature_high_f": 80, "target_temperature_low_f": 65, "eco_temperature_high_f": 80, "eco_temperature_low_f": 65, "away_temperature_high_f": 80, "away_temperature_low_f": 65, "hvac_mode": "heat", "humidity": 40, "hvac_state": "heating", "is_locked": true, "locked_temp_min_f": 65, "locked_temp_max_f": 80 } }, "smoke_co_alarms": { "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": { "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs", "locale": "en-US", "software_version": "1.01", "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw", "where_name": "Jane's Room", "last_connection": "2016-10-31T23:59:59.000Z", "is_online": true, "battery_health": "ok", "co_alarm_state": "ok", "smoke_alarm_state": "ok", "is_manual_test_active": true, "last_manual_test_time": "2016-10-31T23:59:59.000Z", "ui_color_state": "gray" } }, "cameras": { "awJo6rH0IldT2YlIVtYaGQ": { "device_id": "awJo6rH", "software_version": "4.0", "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw", "where_name": "Foyer", "is_online": true, "is_streaming": true, "is_audio_input_enabled": true, "last_is_online_change": "2016-12-29T18:42:00.000Z", "is_video_history_enabled": true, "web_url": "https://home.nest.com/cameras/device_id?auth=access_token", "app_url": "nestmobile://cameras/device_id?auth=access_token", "is_public_share_enabled": true, "activity_zones": { "name": "Walkway", "id": 244083 }, "last_event": "2016-10-31T23:59:59.000Z" } } } }""").toDS
nestDataDS2: org.apache.spark.sql.Dataset[String] = [value: string]
val nestDF2 = spark // spark session .read // get DataFrameReader .schema(nestSchema2) // use the defined schema above and read format as JSON .json(nestDataDS2.rdd)
command-3901575101282763:4: warning: method json in class DataFrameReader is deprecated (since 2.2.0): Use json(Dataset[String]) instead.
.json(nestDataDS2.rdd)
^
nestDF2: org.apache.spark.sql.DataFrame = [devices: struct<thermostats: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,can_cool:boolean,can_heat:boolean,is_using_emergency_heat:boolean,has_fan:boolean,fan_timer_active:boolean,fan_timer_timeout:string,temperature_scale:string,target_temperature_f:double,target_temperature_high_f:double,target_temperature_low_f:double,eco_temperature_high_f:double,eco_temperature_low_f:double,away_temperature_high_f:double,away_temperature_low_f:double,hvac_mode:string,humidity:bigint,hvac_state:string,... 3 more fields>>, smoke_co_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>> ... 1 more field>]
val mapColumnsDF = nestDF2.select($"devices".getItem("smoke_co_alarms").alias ("smoke_alarms"), $"devices".getItem("cameras").alias ("cameras"), $"devices".getItem("thermostats").alias ("thermostats"))
mapColumnsDF: org.apache.spark.sql.DataFrame = [smoke_alarms: map<string,struct<device_id:string,locale:string,software_version:string,structure_id:string,where_name:string,last_connection:string,is_online:boolean,battery_health:string,co_alarm_state:string,smoke_alarm_state:string,is_manual_test_active:boolean,last_manual_test_time:string,ui_color_state:string>>, cameras: map<string,struct<device_id:string,software_version:string,structure_id:string,where_name:string,is_online:boolean,is_streaming:boolean,is_audio_input_enabled:boolean,last_is_online_change:string,is_video_history_enabled:boolean,web_url:string,app_url:string,is_public_share_enabled:boolean,activity_zones:struct<name:string,id:bigint>,last_event:string>> ... 1 more field]
val explodedThermostatsDF = mapColumnsDF.select(explode($"thermostats")) val explodedCamerasDF = mapColumnsDF.select(explode($"cameras")) //or you could use the original nestDF2 and use the devices.X notation val explodedSmokedAlarmsDF = nestDF2.select(explode($"devices.smoke_co_alarms"))
explodedThermostatsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 25 more fields>]
explodedCamerasDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, software_version: string ... 12 more fields>]
explodedSmokedAlarmsDF: org.apache.spark.sql.DataFrame = [key: string, value: struct<device_id: string, locale: string ... 11 more fields>]
val thermostateDF = explodedThermostatsDF.select($"value".getItem("device_id").alias("device_id"), $"value".getItem("locale").alias("locale"), $"value".getItem("where_name").alias("location"), $"value".getItem("last_connection").alias("last_connected"), $"value".getItem("humidity").alias("humidity"), $"value".getItem("target_temperature_f").alias("target_temperature_f"), $"value".getItem("hvac_mode").alias("mode"), $"value".getItem("software_version").alias("version")) val cameraDF = explodedCamerasDF.select($"value".getItem("device_id").alias("device_id"), $"value".getItem("where_name").alias("location"), $"value".getItem("software_version").alias("version"), $"value".getItem("activity_zones").getItem("name").alias("name"), $"value".getItem("activity_zones").getItem("id").alias("id")) val smokedAlarmsDF = explodedSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"), $"value".getItem("where_name").alias("location"), $"value".getItem("software_version").alias("version"), $"value".getItem("last_connection").alias("last_connected"), $"value".getItem("battery_health").alias("battery_health"))
thermostateDF: org.apache.spark.sql.DataFrame = [device_id: string, locale: string ... 6 more fields]
cameraDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields]
smokedAlarmsDF: org.apache.spark.sql.DataFrame = [device_id: string, location: string ... 3 more fields]
Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types
Last refresh: Never