from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType() \
.add("dc_id", StringType()) \
.add("source", MapType(StringType(), StructType() \
.add("description", StringType()) \
.add("ip", StringType()) \
.add("id", IntegerType()) \
.add("temp", ArrayType(IntegerType())) \
.add("c02_level", ArrayType(IntegerType())) \
.add("geo", StructType() \
.add("lat", DoubleType()) \
.add("long", DoubleType()))))
dataDF = jsonToDataFrame( """{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":[35,35,35,36,35,35,32,35,30,35,32,35],
"c02_level": [1475,1476,1473],
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": [45,45,45,46,45,45,42,35,40,45,42,45],
"c02_level": [1370,1371,1372],
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": [40,40,40,40,40,43,42,40,40,45,42,45],
"c02_level": [1346,1345, 1343],
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp":[30,30,30,30,40,43,42,40,40,35,42,35],
"c02_level": [1574,1570, 1576],
"geo": {"lat":35.93, "long":-85.46}
}
}
}""", schema)
display(dataDF)
dataDF.printSchema()
root
|-- dc_id: string (nullable = true)
|-- source: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- description: string (nullable = true)
| | |-- ip: string (nullable = true)
| | |-- id: integer (nullable = true)
| | |-- temp: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- c02_level: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- geo: struct (nullable = true)
| | | |-- lat: double (nullable = true)
| | | |-- long: double (nullable = true)
devicesDataDF.printSchema()
root
|-- dc_id: string (nullable = true)
|-- key: string (nullable = false)
|-- ip: string (nullable = true)
|-- device_id: integer (nullable = true)
|-- c02_levels: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- temp: array (nullable = true)
| |-- element: integer (containsNull = true)
schema2 = StructType() \
.add("device_id", IntegerType()) \
.add("battery_level", ArrayType(IntegerType())) \
.add("c02_level", ArrayType(IntegerType())) \
.add("signal", ArrayType(IntegerType())) \
.add("temp", ArrayType(IntegerType())) \
.add("cca3", ArrayType(StringType())) \
.add("device_type", StringType()) \
.add("ip", StringType()) \
.add("timestamp", TimestampType())
dataDF2 = jsonToDataFrame("""[
{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": ["USA", "United States"], "temp": [25,26, 27], "signal": [23,22,24], "battery_level": [8,9,7], "c02_level": [917, 921, 925], "timestamp" :1475600496 },
{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": ["NOR", "Norway"], "temp": [30, 32,35], "signal": [18,18,19], "battery_level": [6, 6, 5], "c02_level": [1413, 1416, 1417], "timestamp" :1475600498 },
{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": ["USA", "United States"], "temp":[47, 47, 48], "signal": [12,12,13], "battery_level": [1, 1, 0], "c02_level": [1447,1446, 1448], "timestamp" :1475600502 },
{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3":["PHL", "Philippines"], "temp":[29, 29, 28], "signal":[11, 11, 11], "battery_level":[0, 0, 0], "c02_level": [983, 990, 982], "timestamp" :1475600504 },
{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": ["USA", "United States"], "temp":[50,51,50], "signal": [16,16,17], "battery_level": [8,8, 8], "c02_level": [1574,1575,1576], "timestamp" :1475600506 },
{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": ["CHN", "China"], "temp": [21,21,22], "signal": [18,18,19], "battery_level": [9,9,9], "c02_level": [1249,1249,1250], "timestamp" :1475600508 },
{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": ["JPN", "Japan"], "temp":[27,27,28], "signal": [15,15,29], "battery_level":[0,0,0], "c02_level": [1531,1532,1531], "timestamp" :1475600512 },
{"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": ["USA", "United States"], "temp":[40,40,41], "signal": [16,16,17], "battery_level":[ 9, 9, 10], "c02_level": [1208,1209,1208], "timestamp" :1475600514},
{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": ["ITA", "Italy"], "temp": [19,28,5], "signal": [11, 5, 24], "battery_level": [0,-1,0], "c02_level": [1171, 1240, 1400], "timestamp" :1475600516 },
{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": ["USA", "United States"], "temp": [32,33,32], "signal": [26,26,25], "battery_level": [7,7,8], "c02_level": [886,886,887], "timestamp" :1475600518 },
{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": ["IND", "India"], "temp": [46,45,44], "signal": [25,25,24], "battery_level": [4,5,5], "c02_level": [863,862,864], "timestamp" :1475600520 },
{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": ["NOR", "Norway"], "temp": [18,17,18], "signal": [26,25,26], "battery_level": [8,9,8], "c02_level": [1220,1221,1220], "timestamp" :1475600522 },
{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": ["USA", "United States"], "temp": [34,35,34], "signal": [20,21,20], "battery_level": [8,8,8], "c02_level": [1504,1504,1503], "timestamp" :1475600524 },
{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": ["USA", "United States"], "temp": [39,40,38], "signal": [17, 17, 18], "battery_level": [8,8,7], "c02_level": [831,832,831], "timestamp" :1475600526 },
{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": ["USA", "United States"], "temp": [27,27,28], "signal": [26,26,25], "battery_level": [5,5,5], "c02_level": [1378,1376,1378], "timestamp" :1475600528 },
{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": ["CHN", "China"], "temp": [10,10,11], "signal": [24,24,23], "battery_level": [6,5,6], "c02_level": [1423, 1423, 1423], "timestamp" :1475600530 },
{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": ["USA", "United States"], "temp": [38,38,39], "signal": [17,17,17], "battery_level": [9,9,9], "c02_level": [1304,1304,1304], "timestamp" :1475600532 },
{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": ["USA", "United States"], "temp": [26, 0, 99], "signal": [10, 1, 5], "battery_level": [0, 0, 0], "c02_level": [902,902, 1300], "timestamp" :1475600534 },
{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": ["AUT", "Austria"], "temp": [32,32,33], "signal": [27,27,28], "battery_level": [5,5,5], "c02_level": [1282, 1282, 1281], "timestamp" :1475600536 }
]""", schema2)
display(dataDF2)
dataDF2.printSchema()
root
|-- device_id: integer (nullable = true)
|-- battery_level: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- c02_level: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- signal: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- temp: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- cca3: array (nullable = true)
| |-- element: string (containsNull = true)
|-- device_type: string (nullable = true)
|-- ip: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
%sql select cca3, device_type, signal, temp, c02_level,
reduce(signal, 0, (s, sacc) -> s + sacc, sacc -> sacc div size(signal) ) as average_signal,
reduce(temp, 0, (t, tacc) -> t + tacc, tacc -> tacc div size(temp) ) as average_temp,
reduce(c02_level, 0, (c, cacc) -> c + cacc, cacc -> cacc div size(c02_level) ) as average_c02_level
from iot_nested_data
sort by average_signal desc
Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL