Transforming Complex Data Types - Python(Python)

Loading...

Transforming Complex Data Types in Spark SQL

In this notebook we're going to go through some data transformation examples using Spark SQL. Spark SQL supports many built-in transformation functions in the module pyspark.sql.functions therefore we will start off by importing that.

from pyspark.sql.functions import *
from pyspark.sql.types import *
 
# Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json, schema=None):
  # SparkSessions are available with Spark 2.0+
  reader = spark.read
  if schema:
    reader.schema(schema)
  return reader.json(sc.parallelize([json]))

Selecting from nested columns - Dots (".") can be used to access nested columns for structs and maps.

# Using a struct
schema = StructType().add("a", StructType().add("b", IntegerType()))
                          
events = jsonToDataFrame("""
{
  "a": {
     "b": 1
  }
}
""", schema)
 
display(events.select("a.b"))
 
b
1
1

Showing all 1 rows.

# Using a map
schema = StructType().add("a", MapType(StringType(), IntegerType()))
                          
events = jsonToDataFrame("""
{
  "a": {
     "b": 1
  }
}
""", schema)
 
display(events.select("a.b"))
 
b
1
1

Showing all 1 rows.

Flattening structs - A star ("*") can be used to select all of the subfields in a struct.

events = jsonToDataFrame("""
{
  "a": {
     "b": 1,
     "c": 2
  }
}
""")
 
display(events.select("a.*"))
 
b
c
1
1
2

Showing all 1 rows.

Nesting columns - The struct() function or just parentheses in SQL can be used to create a new struct.

events = jsonToDataFrame("""
{
  "a": 1,
  "b": 2,
  "c": 3
}
""")
 
display(events.select(struct(col("a").alias("y")).alias("x")))
 
x
1
{"y": 1}

Showing all 1 rows.

Nesting all columns - The star ("*") can also be used to include all columns in a nested struct.

events = jsonToDataFrame("""
{
  "a": 1,
  "b": 2
}
""")
 
display(events.select(struct("*").alias("x")))
 
x
1
{"a": 1, "b": 2}

Showing all 1 rows.

Selecting a single array or map element - getItem() or square brackets (i.e. [ ]) can be used to select a single element out of an array or a map.

events = jsonToDataFrame("""
{
  "a": [1, 2]
}
""")
 
display(events.select(col("a").getItem(0).alias("x")))
 
x
1
1

Showing all 1 rows.

# Using a map
schema = StructType().add("a", MapType(StringType(), IntegerType()))
 
events = jsonToDataFrame("""
{
  "a": {
    "b": 1
  }
}
""", schema)
 
display(events.select(col("a").getItem("b").alias("x")))
 
x
1
1

Showing all 1 rows.

Creating a row for each array or map element - explode() can be used to create a new row for each element in an array or each key-value pair. This is similar to LATERAL VIEW EXPLODE in HiveQL.

events = jsonToDataFrame("""
{
  "a": [1, 2]
}
""")
 
display(events.select(explode("a").alias("x")))
 
x
1
2
1
2

Showing all 2 rows.

# Using a map
schema = StructType().add("a", MapType(StringType(), IntegerType()))
 
events = jsonToDataFrame("""
{
  "a": {
    "b": 1,
    "c": 2
  }
}
""", schema)
 
display(events.select(explode("a").alias("x", "y")))
 
x
y
1
2
b
1
c
2

Showing all 2 rows.

Collecting multiple rows into an array - collect_list() and collect_set() can be used to aggregate items into an array.

events = jsonToDataFrame("""
[{ "x": 1 }, { "x": 2 }]
""")
 
display(events.select(collect_list("x").alias("x")))
 
x
1
[1, 2]

Showing all 1 rows.

# using an aggregation
events = jsonToDataFrame("""
[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]
""")
 
display(events.groupBy("y").agg(collect_list("x").alias("x")))
 
y
x
1
2
b
[2]
a
[1]

Showing all 2 rows.