Tutorial: Work with SparkR SparkDataFrames on Databricks
This article shows you how to load and transform data using the SparkDataFrame API for SparkR in Databricks.
You can practice running each of this article’s code examples from a cell within an R notebook that is attached to a running cluster. Databricks clusters provide the SparkR (R on Spark) package preinstalled, so that you can start working with the SparkDataFrame API right away.
What is a SparkDataFrame?
A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a database or a data frame in R. SparkDataFrames can be constructed from a wide array of sources such as structured data files, tables in databases, or existing local R data frames. SparkDataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.
Create a SparkDataFrame
Most Apache Spark queries in an R context return a SparkDataFrame. This includes reading from a table, loading data from files, and operations that transform data.
One way to create a SparkDataFrame is by constructing a list of data and specifying the data’s schema and then passing the data and schema to the createDataFrame function, as in the following example. Spark uses the term schema to refer to the names and data types of the columns in the SparkDataFrame. You can print the schema by calling the printSchema function and print the data by calling the showDF function.
Note
Databricks also uses the term schema to describe a collection of tables registered to a catalog.
# Load the SparkR package that is already preinstalled on the cluster.
library(SparkR)
# Get the existing SparkSession that is already initiated on the cluster.
sparkR.session()
# Construct a list of sample data. You must use a type suffix
# for the integers here or you might get the error
# "java.lang.Double is not a valid external type for schema of int."
data <- list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
)
# Specify the data's schema (the columns' names and data types).
schema <- structType(
structField("id", "integer"),
structField("name", "string")
)
# Create the SparkDataFrame based on the specified data and schema.
df <- createDataFrame(
data = data,
schema = schema
)
# Print the SparkDataFrame's schema.
printSchema(df)
# Print the SparkDataFrame's contents.
showDF(df)
# Output:
#
# root
# |-- id: integer (nullable = true)
# |-- name: string (nullable = true)
# +---+-------+
# | id| name|
# +---+-------+
# | 1|Raymond|
# | 2|Loretta|
# | 3| Wayne|
# +---+-------+
Tip
To display the data in a more robust format within a Databricks notebook, you can call the Databricks display
command instead of the SparkR showDF
function, for example:
display(df)
Read a table into a SparkDataFrame
Databricks uses Delta Lake for all tables by default. You can load Delta tables into SparkDataFrames by calling the tableToDF function, as in the following example. This example assumes that you already have access to a table in Databricks named diamonds
in the specified location. If not, change the table’s name and location as needed.
# df <- tableToDF("<catalog-name>.<schema-name>.<table-name>")
df <- tableToDF("main.default.diamonds")
display(df)
# Output:
#
# +-----+---------+...
# |carat| cut|...
# +-----+---------+...
# | 0.23| Ideal|...
# | 0.21| Premium|...
# ...
See also:
Load data into a SparkDataFrame from a file
You can load data from many supported file formats by calling the loadDF function. The following example loads the contents of a CSV file and assumes the file exists in the specified path. This example infers the column names and schema based on the file’s contents. The loadDF
function supports different arguments by file format. For more information, see:
JSON file and JSON Files
All Apache Spark data sources can be used from SparkR. If you can load data from a data source by using PySpark or Scala, you can also load it by using SparkR. The following example uses the source
argument’s value of csv
to load data from a CSV file. To load data from a JSON file instead, you would specify json
, and so on.
df <- loadDF(
path = "/FileStore/tables/diamonds.csv",
source = "csv",
header = "true",
inferSchema = "true"
)
display(df)
# Output:
#
# +-----+---------+...
# |carat| cut|...
# +-----+---------+...
# | 0.23| Ideal|...
# | 0.21| Premium|...
# ...
Assign transformation steps to a SparkDataFrame
The results of most Spark transformations return a SparkDataFrame. You can assign these results back to a SparkDataFrame variable, similar to how you might use common table expressions (CTEs), temporary views, or DataFrames in other systems.
Combine SparkDataFrames with join and union
SparkDataFrames use standard SQL semantics for join operations. A join returns the combined results of two SparkDataFrames based on the provided matching conditions and join type. The following example calls the join function and uses an inner join, which is the default. A call to the select function removes the duplicate id
column from the join’s result:
dataCustomers <- list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
)
schemaCustomers <- structType(
structField("id", "integer"),
structField("name", "string")
)
dataPurchases <- list(
list(1L, "green", "apple"),
list(2L, "purple", "grape"),
list(3L, "yellow", "banana")
)
schemaPurchases <- structType(
structField("id", "integer"),
structField("color", "string"),
structField("fruit", "string")
)
dfCustomers <- createDataFrame(
data = dataCustomers,
schema = schemaCustomers
)
dfPurchases <- createDataFrame(
data = dataPurchases,
schema = schemaPurchases
)
dfPurchaseHistory <- select(
x = join(
x = dfCustomers,
y = dfPurchases,
joinExpr = dfCustomers$id == dfPurchases$id
),
col = list(dfCustomers$id, "name", "color", "fruit")
)
display(dfPurchaseHistory)
# Output:
#
# +---+-------+------+------+
# | id| name| color| fruit|
# +---+-------+------+------+
# | 1|Raymond| green| apple|
# | 2|Loretta|purple| grape|
# | 3| Wayne|yellow|banana|
# +---+-------+------+------+
You can add the rows of one SparkDataFrame to another by calling the union function, as in the following example:
schema <- structType(
structField("id", "integer"),
structField("name", "string")
)
df1 <- createDataFrame(
data = list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
),
schema = schema
)
df2 <- createDataFrame(
data = list(
list(4L, "Terri"),
list(5L, "Jason"),
list(6L, "Tonya")
),
schema = schema
)
dfUnion <- union(df1, df2)
display(dfUnion)
# Output:
#
# +---+-------+
# | id| name|
# +---+-------+
# | 1|Raymond|
# | 2|Loretta|
# | 3| Wayne|
# | 4| Terri|
# | 5| Jason|
# | 6| Tonya|
# +---+-------+
Filter rows in a SparkDataFrame
You can use filtering to select a subset of rows to return or modify in a SparkDataFrame by calling the filter or where functions. There is no difference in performance or syntax, as in the following example:
data <- list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
dfFilter <- filter(
x = df,
condition = df$id > 1
)
dfWhere <- where(
x = df,
condition = df$id > 1
)
showDF(dfFilter)
showDF(dfWhere)
# Output:
#
# +---+-------+
# | id| name|
# +---+-------+
# | 2|Loretta|
# | 3| Wayne|
# +---+-------+
# +---+-------+
# | id| name|
# +---+-------+
# | 2|Loretta|
# | 3| Wayne|
# +---+-------+
Select columns from a SparkDataFrame
You can select columns by passing one or more column names to the select function, as in the following example:
data <- list(
list(1L, "Raymond", "green", "apple"),
list(2L, "Loretta", "purple", "grape"),
list(3L, "Wayne", "yellow", "banana")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string"),
structField("color", "string"),
structField("fruit", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
dfSelect <- select(
x = df,
col = list("id", "color")
)
display(dfSelect)
# Output:
#
# +---+------+
# | id| color|
# +---+------+
# | 1| green|
# | 2|purple|
# | 3|yellow|
# +---+------+
You can combine select and filter queries to limit rows and columns returned, as in the following example:
data <- list(
list(1L, "Raymond", "green", "apple"),
list(2L, "Loretta", "purple", "grape"),
list(3L, "Wayne", "yellow", "banana")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string"),
structField("color", "string"),
structField("fruit", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
dfFilterSelect <- filter(
x = select(
x = df,
col = list("id", "fruit")
),
condition = df$id > 1
)
display(dfFilterSelect)
# Output:
#
# +---+------+
# | id| fruit|
# +---+------+
# | 2| grape|
# | 3|banana|
# +---+------+
Save a SparkDataFrame to a table
Databricks uses Delta Lake for all tables by default. You can save the contents of a SparkDataFrame to a table in Databricks by calling the saveAsTable function, as in the following example:
# tableName <- "<catalog-name>.<schema-name>.<table-name>"
tableName <- "main.default.purchasehistory"
data <- list(
list(1L, "Raymond", "green", "apple"),
list(2L, "Loretta", "purple", "grape"),
list(3L, "Wayne", "yellow", "banana")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string"),
structField("color", "string"),
structField("fruit", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
saveAsTable(
df = df,
tableName = tableName
)
# Verify that the table was successfully saved by
# displaying the table's contents.
display(sql(paste0("SELECT * FROM ", tableName)))
# Output:
#
# +---+-------+------+------+
# | id| name| color| fruit|
# +---+-------+------+------+
# | 1|Raymond| green| apple|
# | 2|Loretta|purple| grape|
# | 3| Wayne|yellow|banana|
# +---+-------+------+------+
Write a SparkDataFrame to a collection of files
Most Spark applications are designed to work on large datasets and work in a distributed fashion, and Spark writes out a directory of files rather than a single file. Many data systems are configured to read these directories of files. Databricks recommends using tables instead of file paths for most applications.
The following example calls the write.json function to save the contents of a table to a directory. For other file formats, see:
See also:
# tableName <- "<catalog-name>.<schema-name>.<table-name>"
tableName <- "main.default.diamonds"
writeToPath <- paste0("/tmp/", tableName)
df <- tableToDF(tableName)
write.json(
x = df,
path = writeToPath
)
# Verify that the JSON was successfully written by
# displaying the JSON file's contents as a SparkDataFrame.
dfJSON <- read.json(writeToPath)
display(dfJSON)
# Output:
#
# +-----+---------+...
# |carat| cut|...
# +-----+---------+...
# | 0.23| Ideal|...
# | 0.21| Premium|...
# ...
Run SQL queries
SparkDataFrames provide a number of options to combine SQL with R.
The selectExpr function enables you to specify each column as a SQL query, such as in the following example:
data <- list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
dfSelectExpr <- selectExpr(
x = df,
expr = "(id + 100) as id", "(upper(name)) as name"
)
display(dfSelectExpr)
# Output:
#
# +---+-------+
# | id| name|
# +---+-------+
# |101|RAYMOND|
# |102|LORETTA|
# |103| WAYNE|
# +---+-------+
The expr function enables you to use SQL syntax anywhere a column would be specified, as in the following example:
data <- list(
list(1L, "Raymond"),
list(2L, "Loretta"),
list(3L, "Wayne")
)
schema <- structType(
structField("id", "integer"),
structField("name", "string")
)
df <- createDataFrame(
data = data,
schema = schema
)
dfSelect <- select(
x = df,
col = list(
expr("(id * 2) as id"),
expr("(lower(name)) as name")
)
)
display(dfSelect)
# Output:
#
# +---+-------+
# | id| name|
# +---+-------+
# | 2|raymond|
# | 4|loretta|
# | 6| wayne|
# +---+-------+
You can also call the sql function run arbitrary SQL queries, as in the following example:
df = sql("SELECT carat, cut, color, clarity FROM main.default.diamonds WHERE clarity = 'VVS2' LIMIT 2")
display(df)
# Output:
#
# +-----+---------+-----+-------+
# |carat| cut|color|clarity|
# +-----+---------+-----+-------+
# | 0.24|Very Good| J| VVS2|
# | 0.23|Very Good| G| VVS2|
# +-----+---------+-----+-------+
You can use string operations to parameterize SQL queries, as in the following example:
columns_list <- "carat, cut, color, clarity"
table_name <- "main.default.diamonds"
column_name <- "clarity"
column_value <- "'VVS2'"
row_limit <- 2
df <- sql(paste0(
"SELECT ",
columns_list,
" FROM ",
table_name,
" WHERE ",
column_name,
" = ",
column_value,
" LIMIT ",
row_limit
))
display(df)
# Output:
#
# +-----+---------+-----+-------+
# |carat| cut|color|clarity|
# +-----+---------+-----+-------+
# | 0.24|Very Good| J| VVS2|
# | 0.23|Very Good| G| VVS2|
# +-----+---------+-----+-------+
Next steps
Read more about Databricks for R developers.
Explore the SparkR overview.
Explore sparklyr, an additional R interface to Apache Spark.
Try the tutorials.
Explore R developer tools.