higher-order-functions-tutorial-python(Python)

Loading...

Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL

This tutorial walks you through four higher-order functions. While this in-depth blog explains the concepts, justifications, and motivations of why handling complex data types such as arrays are important in SQL, and equally explains why their existing implementation are inefficient and cumbersome, this tutorial shows how to use higher-order functions in SQL in processing structured data and arrays in IoT device events. In particular, they come handy and you can put them to good use if you enjoy functional programming and can quickly and can efficiently write a lambda expression as part of these higher-order SQL functions.

This tutorial explores four functions and how you can put them to a wide range of uses in your processing and transforming array types:

  • transform()
  • filter()
  • exists()
  • aggregate()

The takeaway from this short tutorial is that there exists myriad ways to slice and dice nested JSON structures with Spark SQL utility functions. These dedicated higher-order functions are primarily suited to manipulating arrays in Spark SQL, making it easier and the code more concise when processing table values with arrays or nested arrays.

Let's create a simple JSON schema with attributes and values, with at least two attributes that are arrays, namely temp and c02_level

from pyspark.sql.functions import *
from pyspark.sql.types import *
 
schema = StructType() \
          .add("dc_id", StringType()) \
          .add("source", MapType(StringType(), StructType() \
                        .add("description", StringType()) \
                        .add("ip", StringType()) \
                        .add("id", IntegerType()) \
                        .add("temp", ArrayType(IntegerType())) \
                        .add("c02_level", ArrayType(IntegerType())) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))
 

This helper Python function converts a JSON string into a Python DataFrame.

# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

Using the schema above, create a complex JSON stucture and create into a Python DataFrame. Display the DataFrame gives us two columns: a key (dc_id) and value (source), which is JSON string with embedded nested structure.

dataDF = jsonToDataFrame( """{
 
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1476,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1371,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1345, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)
 
display(dataDF)
  
 
dc_id
source
1
dc-101
{"sensor-igauge": {"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}, "sensor-ipad": {"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}, "sensor-inest": {"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}, "sensor-istick": {"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}}

Showing all 1 rows.

By examining its schema, you can notice that the DataFrame schema reflects the above defined schema, where two of its elments are are arrays of integers.

dataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- source: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- description: string (nullable = true) | | |-- ip: string (nullable = true) | | |-- id: integer (nullable = true) | | |-- temp: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- c02_level: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- geo: struct (nullable = true) | | | |-- lat: double (nullable = true) | | | |-- long: double (nullable = true)

Employ explode() to explode the column source into its individual columns.

explodedDF = dataDF.select("dc_id", explode("source"))
display(explodedDF)
 
dc_id
key
value
1
2
3
4
dc-101
sensor-igauge
{"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}
dc-101
sensor-ipad
{"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}
dc-101
sensor-inest
{"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}
dc-101
sensor-istick
{"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}

Showing all 4 rows.

Now you can work with the value column, which a is struct, to extract individual fields by using their names.

#
# use col.getItem(key) to get individual values within our Map
#
devicesDataDF = explodedDF.select("dc_id", "key", \
                        "value.ip", \
                        col("value.id").alias("device_id"), \
                        col("value.c02_level").alias("c02_levels"), \
                        "value.temp")
display(devicesDataDF)
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.

For sanity let's ensure what was created as DataFrame was preserved and adherent to the schema declared above while exploding and extracting individual data items.

devicesDataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- key: string (nullable = false) |-- ip: string (nullable = true) |-- device_id: integer (nullable = true) |-- c02_levels: array (nullable = true) | |-- element: integer (containsNull = true) |-- temp: array (nullable = true) | |-- element: integer (containsNull = true)

Now, since this tutorial is less about DataFrames API and more about higher-order functions and lambdas in SQL, create a temporary table or view and start using the higher-order SQL functions mentioned above.

devicesDataDF.createOrReplaceTempView("data_center_iot_devices")

The table was created as columns in your DataFrames and it reflects its schema.

%sql select * from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]

Showing all 4 rows.

%sql describe data_center_iot_devices
 
col_name
data_type
comment
1
2
3
4
5
6
7
# col_name
data_type
comment
dc_id
string
null
key
string
null
ip
string
null
device_id
int
null
c02_levels
array<int>
null
temp
array<int>
null

Showing all 7 rows.

SQL Higher-Order Functions and Lambda Expressions

How to use transform()

Its functional signature, transform(values, value -> lambda expression), has two components:

  1. transform(values..) is the higher-order function. This takes an array and an anonymous function as its input. Internally, transform takes care of setting up a new array, applying the anonymous function to each element, and then assigning the result to the output array.
  2. The value -> expression is an anonymous function. The function is further divided into two components separated by a -> symbol:
    • The argument list: This case has only one argument: value. You can specify multiple arguments by creating a comma-separated list of arguments enclosed by parenthesis, for example: (x, y) -> x + y.
    • The body: This is a SQL expression that can use the arguments and outer variables to calculate the new value.

In short, the programmatic signature for transform() is as follows:

transform(array<T>, function<T, U>): array<U> This produces an array by applying a function to each element of an input array. Note that the functional programming equivalent operation is map. This has been named transform in order to prevent confusion with the map expression (that creates a map from a key value expression).

This basic scheme for transform(...) works the same way as with other higher-order functions, as you will see shortly.

The following query transforms the values in an array by converting each elmement's temperature reading from Celsius to Fahrenheit.

Let's transform (and hence convert) all our Celsius reading into Fahrenheit. (Use conversion formula: ((C * 9) / 5) + 32) The lambda expression here is the formula to convert C->F. Now, temp and ((t * 9) div 5) + 32 are the arguments to the higher-order function transform(). The anonymous function will iterate through each element in the array, temp, apply the function to it, and transforming its value and placing into an output array. The result is a new column with tranformed values: fahrenheit_temp.

%sql select key, ip, device_id, temp,
     transform (temp, t -> ((t * 9) div 5) + 32 ) as fahrenheit_temp
     from data_center_iot_devices
 
key
ip
device_id
temp
fahrenheit_temp
1
2
3
4
sensor-igauge
68.28.91.22
10
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
[95, 95, 95, 96, 95, 95, 89, 95, 86, 95, 89, 95]
sensor-ipad
67.185.72.1
13
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
[113, 113, 113, 114, 113, 113, 107, 95, 104, 113, 107, 113]
sensor-inest
208.109.163.218
8
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
[104, 104, 104, 104, 104, 109, 107, 104, 104, 113, 107, 113]
sensor-istick
204.116.105.67
5
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
[86, 86, 86, 86, 104, 109, 107, 104, 104, 95, 107, 95]

Showing all 4 rows.

While the above example generates transformed values, this example uses a Boolean expression as a lambda function and generates a boolean array of results instead of values, since the expression t->t > 1300 is a predicate, resulting into a true or false.

%sql select dc_id, key, ip, device_id, c02_levels, temp, 
     transform (c02_levels, t -> t > 1300) as high_c02_levels
     from data_center_iot_devices
    
 
dc_id
key
ip
device_id
c02_levels
temp
high_c02_levels
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
[true, true, true]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
[true, true, true]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
[true, true, true]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
[true, true, true]

Showing all 4 rows.

How to use filter()

As with transform, filter has a similar signature, filter(array<T>, function<T, Boolean>): array<T> Unlike transform() with a boolean expression, it produces an output array from an input array by only adding elements for which predicate function<T, Boolean> holds.

For instance, let's include only readings in our c02_levels that exceed dangerous levels (cO2_level > 1300). Again the functional signature is not dissimilar to transform(). However, note the difference in how filter() generated the resulting array compared to transform() with similar lambda expression.

%sql select dc_id, key, ip, device_id, c02_levels, temp, 
     filter (c02_levels, t -> t > 1300) as high_c02_levels
     from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
high_c02_levels
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
[1475, 1476, 1473]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
[1371, 1372]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
[]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
[1574, 1570, 1576]

Showing all 4 rows.

Notice when the lambda's predicate expression is reversed, the resulting array is empty. That is, it does not evaluate to values true or false as it did in tranform().

%sql select dc_id, key, ip, device_id, c02_levels, temp, 
     filter (c02_levels, t -> t < 1300 ) as high_c02_levels
     from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
high_c02_levels
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
[]
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
[]
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
[]
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
[]

Showing all 4 rows.

How to use exists()

A mildly different functional signature than the above two functions, the idea is simple and same:

exists(array<T>, function<T, V, Boolean>): Boolean Return true if predicate function<T, Boolean> holds for any element in input array.

In this case you can iterate through the temp array and see if a particular value exists in the array. Let's acertain if any of your values contains 45 degrees Celsius or determine of a c02 level in any of the readings equals to 1570.

%sql select dc_id, key, ip, device_id, c02_levels, temp, 
     exists (temp, t -> t = 45 ) as value_exists
     from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
value_exists
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
false
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
true
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
true
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
false

Showing all 4 rows.

%sql select dc_id, key, ip, device_id, c02_levels, temp, 
     exists (c02_levels, t -> t = 1570 ) as high_c02_levels
     from data_center_iot_devices
 
dc_id
key
ip
device_id
c02_levels
temp
high_c02_levels
1
2
3
4
dc-101
sensor-igauge
68.28.91.22
10
[1475, 1476, 1473]
[35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35]
false
dc-101
sensor-ipad
67.185.72.1
13
[1370, 1371, 1372]
[45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45]
false
dc-101
sensor-inest
208.109.163.218
8
[1346, 1345, 1343]
[40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45]
false
dc-101
sensor-istick
204.116.105.67
5
[1574, 1570, 1576]
[30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35]
true

Showing all 4 rows.

How to use reduce()

By far this function and its method is more advanced than others. It also allows you to do aggregation, as seen in the next section. Its signature allows us to some extra bit with the last lambda expression as its functional argument.

reduce(array<T>, B, function<B, T, B>, function<B, R>): R Reduce the elements of array<T> into a single value R by merging the elements into a buffer B using function<B, T, B> and by applying a finish function<B, R> on the final buffer. The initial value B is determined by a zero expression.

The finalize function is optional, if you do not specify the function the finalize function the identity function (id -> id) is used. This is the only higher-order function that takes two lambda functions.

For instance, if you want to compute an average of the temperature readings, use lambda expressions: The first one accumulates all the results into an internal temporary buffer, and the second function applies to the final accumulated buffer. With respect to our signature above, B is 0; function<B,T,B> is t + acc, and function<B,R> is acc div size(temp). Furthermore, in the finalize lambda expression, convert the average temperature to Fahrenheit.

%sql select key, ip, device_id, temp,
    reduce(temp, 0, (t, acc) -> t + acc, acc-> (acc div size(temp) * 9 div 5) + 32 ) as average_f_temp
    from data_center_iot_devices
    sort by average_f_temp desc
0.00204060801001201385100.00204060801001201385100.00204060801001201385100.0020406080100120138510TOOLTIPsensor-ipadsensor-inestsensor-isticksensor-igaugedevice_idaverage_f_tempkeysensor-ipadsensor-ipadsensor-inestsensor-inestsensor-isticksensor-isticksensor-igaugesensor-igauge