higher-order-functions-tutorial-python(Python)

Loading...

Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL

This tutorial walks you through four higher-order functions. While this in-depth blog explains the concepts, justifications, and motivations of why handling complex data types such as arrays are important in SQL, and equally explains why their existing implementation are inefficient and cumbersome, this tutorial shows how to use higher-order functions in SQL in processing structured data and arrays in IoT device events. In particular, they come handy and you can put them to good use if you enjoy functional programming and can quickly and can efficiently write a lambda expression as part of these higher-order SQL functions.

This tutorial explores four functions and how you can put them to a wide range of uses in your processing and transforming array types:

  • transform()
  • filter()
  • exists()
  • aggregate()

The takeaway from this short tutorial is that there exists myriad ways to slice and dice nested JSON structures with Spark SQL utility functions. These dedicated higher-order functions are primarily suited to manipulating arrays in Spark SQL, making it easier and the code more concise when processing table values with arrays or nested arrays.

Let's create a simple JSON schema with attributes and values, with at least two attributes that are arrays, namely temp and c02_level

from pyspark.sql.functions import *
from pyspark.sql.types import *
 
schema = StructType() \
          .add("dc_id", StringType()) \
          .add("source", MapType(StringType(), StructType() \
                        .add("description", StringType()) \
                        .add("ip", StringType()) \
                        .add("id", IntegerType()) \
                        .add("temp", ArrayType(IntegerType())) \
                        .add("c02_level", ArrayType(IntegerType())) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))
 

This helper Python function converts a JSON string into a Python DataFrame.

# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

Using the schema above, create a complex JSON stucture and create into a Python DataFrame. Display the DataFrame gives us two columns: a key (dc_id) and value (source), which is JSON string with embedded nested structure.

dataDF = jsonToDataFrame( """{
 
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1476,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1371,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1345, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)
 
display(dataDF)
  
 
dc_id
source
1
dc-101
{"sensor-igauge": {"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}, "sensor-ipad": {"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}, "sensor-inest": {"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}, "sensor-istick": {"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}}

Showing all 1 rows.

By examining its schema, you can notice that the DataFrame schema reflects the above defined schema, where two of its elments are are arrays of integers.

dataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- source: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- description: string (nullable = true) | | |-- ip: string (nullable = true) | | |-- id: integer (nullable = true) | | |-- temp: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- c02_level: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- geo: struct (nullable = true) | | | |-- lat: double (nullable = true) | | | |-- long: double (nullable = true)

Employ explode() to explode the column source into its individual columns.

explodedDF = dataDF.select("dc_id", explode("source"))
display(explodedDF)
 
dc_id
key
value
1
2
3
4
dc-101
sensor-igauge
{"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}
dc-101
sensor-ipad
{"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}
dc-101
sensor-inest
{"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}
dc-101
sensor-istick
{"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}

Showing all 4 rows.

Now you can work with the value column, which a is struct, to extract individual fields by using their names.

#
# use col.getItem(key) to get individual values within our Map
#
devicesDataDF = explodedDF.select("dc_id", "key", \
                        "value.ip", \
                        col("value.id").alias("device_id"), \
                        col("value.c02_level").alias("c02_levels"), \
                        "value.temp")
display(devicesDataDF)
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.

For sanity let's ensure what was created as DataFrame was preserved and adherent to the schema declared above while exploding and extracting individual data items.

devicesDataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- key: string (nullable = false) |-- ip: string (nullable = true) |-- device_id: integer (nullable = true) |-- c02_levels: array (nullable = true) | |-- element: integer (containsNull = true) |-- temp: array (nullable = true) | |-- element: integer (containsNull = true)

Now, since this tutorial is less about DataFrames API and more about higher-order functions and lambdas in SQL, create a temporary table or view and start using the higher-order SQL functions mentioned above.

devicesDataDF.createOrReplaceTempView("data_center_iot_devices")

The table was created as columns in your DataFrames and it reflects its schema.

%sql select * from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.