SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. Starting with Spark 1.4.x, SparkR provides a distributed DataFrame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, dplyr) but on large datasets. SparkR also supports distributed machine learning using MLlib.
We will cover the following topics in this overview:
- SparkR in Databricks Notebooks
- Creating SparkR DataFrames
- From Local R Data Frames
- From Data Sources using Spark SQL
- Using Data Source Connectors with Spark Packages
- From Spark SQL Queries
- DataFrame Operations
- Selecting Rows & Columns
- Grouping & Aggregation
- Column Operations
- Machine Learning
Databricks also supports sparklyr, please see the documentation for more information.
SparkR in Databricks Notebooks¶
SparkR started as a research project at AMPLab. With release of Spark 1.4.0, SparkR was inlined in Apache Spark. At that time Databricks released R Notebooks to be the first company that officially supports SparkR. To further facilitate usage of SparkR, Databricks R notebooks imported SparkR by default and provided a working sqlContext object.
SparkR and Databricks R notebooks evolved significantly since 2015. For the best experience, we highly recommend that you use the latest version of Spark on Databricks when you use either R or SparkR. Some of the most notable change in R and SparkR are:
- Starting with Spark 2.0,
users do not need to explicitly pass a
sqlContextobject to every function call. This change reduced boilerplate code and made SparkR usercode more intuitive and readable. In this document we will follow the new syntax. For old syntax examples please refer to SparkR documentation prior to 2.0: SparkR Overview
- Starting with Spark 2.2 Databricks notebooks do not import SparkR by default. Some of SparkR
functions were conflicting with similarly named functions from other popular packages. Users who
wish to use SparkR simply need to call
library(SparkR)in their notebooks. The SparkR session is already configured and all SparkR functions will talk to your attached cluster using the exising session.
Creating SparkR DataFrames¶
You can create DataFrames from a local R data frame, from data sources, or using Spark SQL queries.
The simplest way to create a DataFrame is to convert a local R
data.frame into a
SparkDataFrame. Specifically we can use
createDataFrame and pass in the local R
data.frame to create a SparkDataFrame. Like most other SparkR functions,
syntax changed with Spark 2.0. You can see examples of this in the code snippet bellow.
Refer to the guide on createDataFrame for more examples.
library(SparkR) df <- createDataFrame(faithful) # Displays the content of the DataFrame to stdout head(df)
From Data Sources using Spark SQL¶
The general method for creating DataFrames from data sources is read.df. This method takes the path for the file to load and the type of data source. SparkR supports reading CSV, JSON, Text and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like CSV and Avro.
library(SparkR) diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true") head(diamonds)
SparkR automatically infers the schema from the CSV file.
Using Data Source Connectors with Spark Packages¶
Note on availability of spark-avro package:
- Spark 2.1.0-db2 and later versions: this library is automatically included in these cluster images and is documented by this page.
- Spark 2.0.x through 2.1.0-db1: this library is not included in these cluster images. Instead,
you can install version 3.1.0
spark-avrolibrary using Databricks’ Maven library installer. For documentation specific to that version of the library, see the version 3.1.0 README in the databricks/spark-avro repository.
- Spark 1.6.x: version 2.0.1 of
spark-avrolibrary is automatically included in the cluster image. For documentation specific to that version of the library, see the version 2.0.1 README in the databricks/spark-avro repository.
First we will take an existing data.frame, convert to SparkDataFrame and save it as an Avro file. .. code:: r
require(SparkR) irisDF <- createDataFrame(iris) write.df(irisDF, source = “com.databricks.spark.avro”, path = “dbfs:/tmp/iris.avro”, mode = “overwrite”)
To verify that we saved an Avro file: .. code:: r
%fs ls /tmp/iris
Now we use the spark-avro package again to read back the data. .. code:: r
irisDF2 <- read.df(path = “/tmp/iris.avro”, source = “com.databricks.spark.avro”) head(irisDF2)
The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example to a Parquet file using write.df
write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet
From Spark SQL Queries¶
You can also create SparkR DataFrames using Spark SQL queries.
# Register earlier df as temp table registerTempTable(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query age <- sql("SELECT age FROM peopleTemp") head(age)
age is a SparkDataFrame.
.. code:: r
# Resulting df is a SparkDataFrame str(age)
SparkDataFrames support a number of functions to do structured data processing. Here we include some basic examples and a complete list can be found in the API docs.
Selecting Rows & Columns¶
# Import SparkR package if this is a new notebook require(SparkR) # Create DataFrame df <- createDataFrame(faithful) df
# Select only the "eruptions" column head(select(df, df$eruptions))
# You can also pass in column name as strings head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins head(filter(df, df$waiting < 50))
Grouping & Aggregation¶
SparkDataFrames support a number of commonly used functions to aggregate data after grouping. For example we can count the number of times each waiting time appears in the faithful dataset.
# We can also sort the output from the aggregation to get the most common waiting times waiting_counts <- count(groupBy(df, df$waiting)) head(arrange(waiting_counts, desc(waiting_counts$count)))
SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions.
# Convert waiting time from hours to seconds. # Note that we can assign this to a new column in the same DataFrame df$waiting_secs <- df$waiting * 60 head(df)
SparkR exposes most of MLLib algorithms. Under the hood, SparkR uses MLlib to train the model.
The example below shows the use of building a gaussian GLM model using SparkR. To run Linear Regression, set family to “gaussian”. To run Logistic Regression, set family to “binomial”. When using SparkML GLM SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.
# Create the DataFrame df <- createDataFrame(sqlContext, iris) # Fit a linear model over the dataset. model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian") # Model coefficients are returned in a similar format to R's native glm(). summary(model)