from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType() \
.add("dc_id", StringType()) \
.add("source", MapType(StringType(), StructType() \
.add("description", StringType()) \
.add("ip", StringType()) \
.add("id", IntegerType()) \
.add("temp", ArrayType(IntegerType())) \
.add("c02_level", ArrayType(IntegerType())) \
.add("geo", StructType() \
.add("lat", DoubleType()) \
.add("long", DoubleType()))))
dataDF = jsonToDataFrame( """{
"dc_id": "dc-101",
"source": {
"sensor-igauge": {
"id": 10,
"ip": "68.28.91.22",
"description": "Sensor attached to the container ceilings",
"temp":[35,35,35,36,35,35,32,35,30,35,32,35],
"c02_level": [1475,1476,1473],
"geo": {"lat":38.00, "long":97.00}
},
"sensor-ipad": {
"id": 13,
"ip": "67.185.72.1",
"description": "Sensor ipad attached to carbon cylinders",
"temp": [45,45,45,46,45,45,42,35,40,45,42,45],
"c02_level": [1370,1371,1372],
"geo": {"lat":47.41, "long":-122.00}
},
"sensor-inest": {
"id": 8,
"ip": "208.109.163.218",
"description": "Sensor attached to the factory ceilings",
"temp": [40,40,40,40,40,43,42,40,40,45,42,45],
"c02_level": [1346,1345, 1343],
"geo": {"lat":33.61, "long":-111.89}
},
"sensor-istick": {
"id": 5,
"ip": "204.116.105.67",
"description": "Sensor embedded in exhaust pipes in the ceilings",
"temp":[30,30,30,30,40,43,42,40,40,35,42,35],
"c02_level": [1574,1570, 1576],
"geo": {"lat":35.93, "long":-85.46}
}
}
}""", schema)
display(dataDF)
dataDF.printSchema()
root
|-- dc_id: string (nullable = true)
|-- source: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- description: string (nullable = true)
| | |-- ip: string (nullable = true)
| | |-- id: integer (nullable = true)
| | |-- temp: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- c02_level: array (nullable = true)
| | | |-- element: integer (containsNull = true)
| | |-- geo: struct (nullable = true)
| | | |-- lat: double (nullable = true)
| | | |-- long: double (nullable = true)
devicesDataDF.printSchema()
root
|-- dc_id: string (nullable = true)
|-- key: string (nullable = false)
|-- ip: string (nullable = true)
|-- device_id: integer (nullable = true)
|-- c02_levels: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- temp: array (nullable = true)
| |-- element: integer (containsNull = true)
Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL