higher-order-functions-tutorial-python(Python)

Loading...

Higher-Order and Lambda Functions: Explore Complex and Structured Data in SQL

This tutorial walks you through four higher-order functions. While this in-depth blog explains the concepts, justifications, and motivations of why handling complex data types such as arrays are important in SQL, and equally explains why their existing implementation are inefficient and cumbersome, this tutorial shows how to use higher-order functions in SQL in processing structured data and arrays in IoT device events. In particular, they come handy and you can put them to good use if you enjoy functional programming and can quickly and can efficiently write a lambda expression as part of these higher-order SQL functions.

This tutorial explores four functions and how you can put them to a wide range of uses in your processing and transforming array types:

  • transform()
  • filter()
  • exists()
  • aggregate()

The takeaway from this short tutorial is that there exists myriad ways to slice and dice nested JSON structures with Spark SQL utility functions. These dedicated higher-order functions are primarily suited to manipulating arrays in Spark SQL, making it easier and the code more concise when processing table values with arrays or nested arrays.

Let's create a simple JSON schema with attributes and values, with at least two attributes that are arrays, namely temp and c02_level

from pyspark.sql.functions import *
from pyspark.sql.types import *
 
schema = StructType() \
          .add("dc_id", StringType()) \
          .add("source", MapType(StringType(), StructType() \
                        .add("description", StringType()) \
                        .add("ip", StringType()) \
                        .add("id", IntegerType()) \
                        .add("temp", ArrayType(IntegerType())) \
                        .add("c02_level", ArrayType(IntegerType())) \
                        .add("geo", StructType() \
                              .add("lat", DoubleType()) \
                              .add("long", DoubleType()))))
 

This helper Python function converts a JSON string into a Python DataFrame.

# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

Using the schema above, create a complex JSON stucture and create into a Python DataFrame. Display the DataFrame gives us two columns: a key (dc_id) and value (source), which is JSON string with embedded nested structure.

dataDF = jsonToDataFrame( """{
 
    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1476,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1371,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1345, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""", schema)
 
display(dataDF)
  
 
dc_id
source
1
dc-101
{"sensor-igauge": {"description": "Sensor attached to the container ceilings", "ip": "68.28.91.22", "id": 10, "temp": [35, 35, 35, 36, 35, 35, 32, 35, 30, 35, 32, 35], "c02_level": [1475, 1476, 1473], "geo": {"lat": 38, "long": 97}}, "sensor-ipad": {"description": "Sensor ipad attached to carbon cylinders", "ip": "67.185.72.1", "id": 13, "temp": [45, 45, 45, 46, 45, 45, 42, 35, 40, 45, 42, 45], "c02_level": [1370, 1371, 1372], "geo": {"lat": 47.41, "long": -122}}, "sensor-inest": {"description": "Sensor attached to the factory ceilings", "ip": "208.109.163.218", "id": 8, "temp": [40, 40, 40, 40, 40, 43, 42, 40, 40, 45, 42, 45], "c02_level": [1346, 1345, 1343], "geo": {"lat": 33.61, "long": -111.89}}, "sensor-istick": {"description": "Sensor embedded in exhaust pipes in the ceilings", "ip": "204.116.105.67", "id": 5, "temp": [30, 30, 30, 30, 40, 43, 42, 40, 40, 35, 42, 35], "c02_level": [1574, 1570, 1576], "geo": {"lat": 35.93, "long": -85.46}}}

Showing all 1 rows.

By examining its schema, you can notice that the DataFrame schema reflects the above defined schema, where two of its elments are are arrays of integers.

dataDF.printSchema()
root |-- dc_id: string (nullable = true) |-- source: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- description: string (nullable = true) | | |-- ip: string (nullable = true) | | |-- id: integer (nullable = true) | | |-- temp: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- c02_level: array (nullable = true) | | | |-- element: integer (containsNull = true) | | |-- geo: struct (nullable = true) | | | |-- lat: double (nullable = true) | | | |-- long: double (nullable = true)