SparkR Overview

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark 1.5.1, SparkR provides a distributed DataFrame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.

We will cover the below topics in this overview:

  • Creating SparkR DataFrames
  • From Local R Data Frames
  • From Data Sources using Spark SQL
  • Using Data Source Connectors with Spark Packages
  • From Spark SQL Queries
  • DataFrame Operations
  • Selecting Rows & Columns
  • Grouping & Aggregation
  • Column Operations
  • Machine Learning

Creating SparkR DataFrames

Applications can create DataFrames from a local R data frame, from data sources, or using Spark SQL queries.

The simplest way to create a DataFrame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use create a DataFrame and pass in the local R data frame to create a SparkR DataFrame. As an example, the following cell creates a DataFrame using the faithful dataset from R.

Refer to the guide on createDataFrame for more examples.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

From Data Sources using Spark SQL

The general method for creating DataFrames from data sources is read.df. This method takes in the SQLContext, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like CSV and Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR automatically infers the schema from the JSON file.

printSchema(people)
display(people)

Using Data Source Connectors with Spark Packages

As an example, we will use the Spark CSV package to load a CSV file. You can find a list of Spark Packages by Databricks here.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example to a Parquet file using write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

From Spark SQL Queries

You can also create SparkR DataFrames using Spark SQL queries.

# Register earlier df as temp table
registerTempTable(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

DataFrame Operations

SparkR DataFrames support a number of functions to do structured data processing. Here we include some basic examples and a complete list can be found in the API docs.

Selecting Rows & Columns

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Grouping & Aggregation

SparkR DataFrames support a number of commonly used functions to aggregate data after grouping. For example we can count the number of times each waiting time appears in the faithful dataset.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Column Operations

SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions.

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Machine Learning

As of Spark 1.5, SparkR allows the fitting of generalized linear models over SparkR DataFrames using the glm() function. Under the hood, SparkR uses MLlib to train a model of the specified family. We support a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘+’, and ‘-‘.

Under the hood, SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

The example below shows the use of building a gaussian GLM model using SparkR. To run Linear Regression, set family to “gaussian”. To run Logistic Regression, set family to “binomial”.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Converting local R data frames to SparkR DataFrames

We can use createDataFrame to convert local R data frames to SparkR DataFrames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)