Transforming Complex Data Types - Scala(Scala)

Loading...

Transforming Complex Data Types in Spark SQL

In this notebook we're going to go through some data transformation examples using Spark SQL. Spark SQL supports many built-in transformation functions in the module org.apache.spark.sql.functions._ therefore we will start off by importing that.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
 
// Convenience function for turning JSON strings into DataFrames.
def jsonToDataFrame(json: String, schema: StructType = null): DataFrame = {
  // SparkSessions are available with Spark 2.0+
  val reader = spark.read
  Option(schema).foreach(reader.schema)
  reader.json(sc.parallelize(Array(json)))
}
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ jsonToDataFrame: (json: String, schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame

Selecting from nested columns - Dots (".") can be used to access nested columns for structs and maps.

// Using a struct
val schema = new StructType().add("a", new StructType().add("b", IntegerType))
                          
val events = jsonToDataFrame("""
{
  "a": {
     "b": 1
  }
}
""", schema)
 
display(events.select("a.b"))
 
b
1
1

Showing all 1 rows.

// Using a map
val schema = new StructType().add("a", MapType(StringType, IntegerType))
                          
val events = jsonToDataFrame("""
{
  "a": {
     "b": 1
  }
}
""", schema)
 
display(events.select("a.b"))
 
b
1
1

Showing all 1 rows.

Flattening structs - A star ("*") can be used to select all of the subfields in a struct.

val events = jsonToDataFrame("""
{
  "a": {
     "b": 1,
     "c": 2
  }
}
""")
 
display(events.select("a.*"))
 
b
c
1
1
2

Showing all 1 rows.

Nesting columns - The struct() function or just parentheses in SQL can be used to create a new struct.

val events = jsonToDataFrame("""
{
  "a": 1,
  "b": 2,
  "c": 3
}
""")
 
display(events.select(struct('a as 'y) as 'x))
 
x
1
{"y": 1}

Showing all 1 rows.

Nesting all columns - The star ("*") can also be used to include all columns in a nested struct.

val events = jsonToDataFrame("""
{
  "a": 1,
  "b": 2
}
""")
 
display(events.select(struct("*") as 'x))
 
x
1
{"a": 1, "b": 2}

Showing all 1 rows.

Selecting a single array or map element - getItem() or square brackets (i.e. [ ]) can be used to select a single element out of an array or a map.

val events = jsonToDataFrame("""
{
  "a": [1, 2]
}
""")
 
display(events.select('a.getItem(0) as 'x))
 
x
1
1

Showing all 1 rows.

// Using a map
val schema = new StructType().add("a", MapType(StringType, IntegerType))
 
val events = jsonToDataFrame("""
{
  "a": {
    "b": 1
  }
}
""", schema)
 
display(events.select('a.getItem("b") as 'x))
 
x
1
1

Showing all 1 rows.

Creating a row for each array or map element - explode() can be used to create a new row for each element in an array or each key-value pair. This is similar to LATERAL VIEW EXPLODE in HiveQL.

val events = jsonToDataFrame("""
{
  "a": [1, 2]
}
""")
 
display(events.select(explode('a) as 'x))
 
x
1
2
1
2

Showing all 2 rows.

// Using a map
val schema = new StructType().add("a", MapType(StringType, IntegerType))
 
val events = jsonToDataFrame("""
{
  "a": {
    "b": 1,
    "c": 2
  }
}
""", schema)
 
display(events.select(explode('a) as (Seq("x", "y"))))
 
x
y
1
2
b
1
c
2

Showing all 2 rows.

Collecting multiple rows into an array - collect_list() and collect_set() can be used to aggregate items into an array.

val events = jsonToDataFrame("""
[{ "x": 1 }, { "x": 2 }]
""")
 
display(events.select(collect_list('x) as 'x))
 
x
1
[1, 2]

Showing all 1 rows.

// using an aggregation
val events = jsonToDataFrame("""
[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]
""")
 
display(events.groupBy("y").agg(collect_list('x) as 'x))
 
y
x
1
2
b
[2]
a
[1]

Showing all 2 rows.

Selecting one field from each item in an array - when you use dot notation on an array we return a new array where that field has been selected from each array element.

val events = jsonToDataFrame("""
{
  "a": [
    {"b": 1},
    {"b": 2}
  ]
}
""")
 
display(events.select("a.b"))
 
b
1
[1, 2]

Showing all 1 rows.

Convert a group of columns to json - to_json() can be used to turn structs into json strings. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. This method is not presently available in SQL.

val events = jsonToDataFrame("""
{
  "a": {
    "b": 1
  }
}
""")
 
display(events.select(to_json('a) as 'c))
 
c
1
{"b":1}

Showing all 1 rows.

Parse a column containing json - from_json() can be used to turn a string column with json data into a struct. Then you may flatten the struct as described above to have individual columns. This method is not presently available in SQL. This method is available since Spark 2.1

val events = jsonToDataFrame("""
{
  "a": "{\"b\":1}"
}
""")
 
val schema = new StructType().add("b", IntegerType)
display(events.select(from_json('a, schema) as 'c))
 
c
1
{"b": 1}

Showing all 1 rows.

Sometimes you may want to leave a part of the JSON string still as JSON to avoid too much complexity in your schema.

val events = jsonToDataFrame("""
{
  "a": "{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"
}
""")
 
val schema = new StructType().add("b", new StructType().add("x", IntegerType)
  .add("y", StringType))
display(events.select(from_json('a, schema) as 'c))
 
c
1
{"b": {"x": 1, "y": "{\"z\":2}"}}

Showing all 1 rows.

Parse a set of fields from a column containing json - json_tuple() can be used to extract a fields available in a string column with json data.

val events = jsonToDataFrame("""
{
  "a": "{\"b\":1}"
}
""")
 
display(events.select(json_tuple('a, "b") as 'c))
 
c
1
1

Showing all 1 rows.

Parse a well formed string column - regexp_extract() can be used to parse strings using regular expressions.

val events = jsonToDataFrame("""
[{ "a": "x: 1" }, { "a": "y: 2" }]
""")
 
display(events.select(regexp_extract('a, "([a-z]):", 1) as 'c))
 
c
1
2
x
y

Showing all 2 rows.