higher-order-functions-tutorial-python(Python)

Loading...

Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL

This tutorial walks you through four higher-order functions. While this in-depth blog explains the concepts, justifications, and motivations of why handling complex data types such as arrays are important in SQL, and equally explains why their existing implementation are inefficient and cumbersome, this tutorial shows how to use higher-order functions in SQL in processing structured data and arrays in IoT device events. In particular, they come handy and you can put them to good use if you enjoy functional programming and can quickly and can efficiently write a lambda expression as part of these higher-order SQL functions.

This tutorial explores four functions and how you can put them to a wide range of uses in your processing and transforming array types:

  • transform()
  • filter()
  • exists()
  • aggregate()

The takeaway from this short tutorial is that there exists myriad ways to slice and dice nested JSON structures with Spark SQL utility functions. These dedicated higher-order functions are primarily suited to manipulating arrays in Spark SQL, making it easier and the code more concise when processing table values with arrays or nested arrays.

Let's create a simple JSON schema with attributes and values, with at least two attributes that are arrays, namely temp and c02_level

from pyspark.sql.functions import *
from pyspark.sql.types import *
 
schema = StructType() \
          .add("dc_id", StringType()) \
          .add("source", MapType(StringType(), StructType() \
                        .add("description", StringType()) \
                        .add("ip", StringType()) \
                        .add("id", IntegerType()) \
                        .add("temp", ArrayType(IntegerType())) \
                        .add("c02_level", ArrayType(IntegerType())) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))
 

This helper Python function converts a JSON string into a Python DataFrame.

# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

Using the schema above, create a complex JSON stucture and create into a Python DataFrame. Display the DataFrame gives us two columns: a key (dc_id) and value (source), which is JSON string with embedded nested structure.

dataDF = jsonToDataFrame( """{
 
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1476,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1371,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1345, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)
 
display(dataDF)
  
 
dc_id
source
1
dc-101
{"sensor-igauge": {"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}, "sensor-ipad": {"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}, "sensor-inest": {"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}, "sensor-istick": {"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}}

Showing all 1 rows.

By examining its schema, you can notice that the DataFrame schema reflects the above defined schema, where two of its elments are are arrays of integers.

dataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- source: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- description: string (nullable = true) | | |-- ip: string (nullable = true) | | |-- id: integer (nullable = true) | | |-- temp: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- c02_level: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- geo: struct (nullable = true) | | | |-- lat: double (nullable = true) | | | |-- long: double (nullable = true)

Employ explode() to explode the column source into its individual columns.

explodedDF = dataDF.select("dc_id", explode("source"))
display(explodedDF)
 
dc_id
key
value
1
2
3
4
dc-101
sensor-igauge
{"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}
dc-101
sensor-ipad
{"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}
dc-101
sensor-inest
{"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}
dc-101
sensor-istick
{"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}

Showing all 4 rows.

Now you can work with the value column, which a is struct, to extract individual fields by using their names.

#
# use col.getItem(key) to get individual values within our Map
#
devicesDataDF = explodedDF.select("dc_id", "key", \
                        "value.ip", \
                        col("value.id").alias("device_id"), \
                        col("value.c02_level").alias("c02_levels"), \
                        "value.temp")
display(devicesDataDF)
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.

For sanity let's ensure what was created as DataFrame was preserved and adherent to the schema declared above while exploding and extracting individual data items.

devicesDataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- key: string (nullable = false) |-- ip: string (nullable = true) |-- device_id: integer (nullable = true) |-- c02_levels: array (nullable = true) | |-- element: integer (containsNull = true) |-- temp: array (nullable = true) | |-- element: integer (containsNull = true)

Now, since this tutorial is less about DataFrames API and more about higher-order functions and lambdas in SQL, create a temporary table or view and start using the higher-order SQL functions mentioned above.

devicesDataDF.createOrReplaceTempView("data_center_iot_devices")

The table was created as columns in your DataFrames and it reflects its schema.

%sql select * from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.

%sql describe data_center_iot_devices
 
col_name
data_type
comment
1
2
3
4
5
6
7
# col_name
data_type
comment
dc_id
string
null
key
string
null
ip
string
null
device_id
int
null
c02_levels
array<int>
null
temp
array<int>
null

Showing all 7 rows.

SQL Higher-Order Functions and Lambda Expressions

How to use transform()

Its functional signature, transform(values, value -> lambda expression), has two components:

  1. transform(values..) is the higher-order function. This takes an array and an anonymous function as its input. Internally, transform takes care of setting up a new array, applying the anonymous function to each element, and then assigning the result to the output array.
  2. The value -> expression is an anonymous function. The function is further divided into two components separated by a -> symbol:
    • The argument list: This case has only one argument: value. You can specify multiple arguments by creating a comma-separated list of arguments enclosed by parenthesis, for example: (x, y) -> x + y.
    • The body: This is a SQL expression that can use the arguments and outer variables to calculate the new value.

In short, the programmatic signature for transform() is as follows:

transform(array<T>, function<T, U>): array<U> This produces an array by applying a function to each element of an input array. Note that the functional programming equivalent operation is map. This has been named transform in order to prevent confusion with the map expression (that creates a map from a key value expression).

This basic scheme for transform(...) works the same way as with other higher-order functions, as you will see shortly.

The following query transforms the values in an array by converting each elmement's temperature reading from Celsius to Fahrenheit.

Let's transform (and hence convert) all our Celsius reading into Fahrenheit. (Use conversion formula: ((C * 9) / 5) + 32) The lambda expression here is the formula to convert C->F. Now, temp and ((t * 9) div 5) + 32 are the arguments to the higher-order function transform(). The anonymous function will iterate through each element in the array, temp, apply the function to it, and transforming its value and placing into an output array. The result is a new column with tranformed values: fahrenheit_temp.