Work with DataFrames and tables in R

This article describes how to use R packages such as SparkR, sparklyr, and dplyr to work with R data.frames, Spark DataFrames, and in-memory tables.

Note that as you work with SparkR, sparklyr, and dplyr, you may find that you can complete a particular operation with all of these packages, and you can use the package that you are most comfortable with. For example, to run a query, you can call functions such as SparkR::sql, sparklyr::sdf_sql, and dplyr::select. At other times, you might be able to complete an operation with just one or two of these packages, and the operation you choose depends on your usage scenario. For example, the way you call sparklyr::sdf_quantile differs slightly from the way you call dplyr::percentile_approx, even though both functions calcuate quantiles.

You can use SQL as a bridge between SparkR and sparklyr. For example, you can use SparkR::sql to query tables that you create with sparklyr. You can use sparklyr::sdf_sql to query tables that you create with SparkR. And dplyr code always gets translated to SQL in memory before it is run. See also API interoperability and SQL Translation.

Load SparkR, sparklyr, and dplyr

The SparkR, sparklyr, and dplyr packages are included in the Databricks Runtime that is installed on Databricks clusters. Therefore, you do not need to call the usual install.package before you can begin call these packages. However, you must still load these packages with library first. For example, from within an R notebook in a Databricks workspace, run the following code in a notebook cell to load SparkR, sparklyr, and dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

Connect sparklyr to a cluster

After you load sparklyr, you must call sparklyr::spark_connect to connect to the cluster, specifying the databricks connection method. For example, run the following code in a notebook cell to connect to the cluster that hosts the notebook:

sc <- spark_connect(method = "databricks")

In contrast, a Databricks notebook already establishes a SparkSession on the cluster for use with SparkR, so you do not need to call SparkR::sparkR.session before you can begin calling SparkR.

Upload a JSON data file to your workspace

Many of the code examples in this article are based on data in a specific location in your Databricks workspace, with specific column names and data types. The data for this code example originates in a JSON file named book.json from within GitHub. To get this file and upload it to your workspace:

  1. Go to the books.json file on GitHub and use a text editor to copy its contents to a file named books.json somewhere on your local machine.

  2. In your Databricks workspace sidebar, click Catalog.

  3. Click Create Table.

  4. On the Upload File tab, drop the books.json file from your local machine to the Drop files to upload box. Or select click to browse, and browse to the books.json file from your local machine.

By default, Databricks uploads your local books.json file to the DBFS location in your workspace with the path /FileStore/tables/books.json.

Do not click Create Table with UI or Create Table in Notebook. The code examples in this article use the data in the uploaded books.json file in this DBFS location.

Read the JSON data into a DataFrame

Use sparklyr::spark_read_json to read the uploaded JSON file into a DataFrame, specifying the connection, the path to the JSON file, and a name for the internal table representation of the data. For this example, you must specify that the book.json file contains multiple lines. Specifying the columns’ schema here is optional. Otherwise, sparklyr infers the columns’ schema by default. For example, run the following code in a notebook cell to read the uploaded JSON file’s data into a DataFrame named jsonDF:

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

Run SQL queries, and write to and read from a table

You can use dplyr functions to run SQL queries on a DataFrame. For example, run the following code in a notebook cell to use dplyr::group_by and dployr::count to get counts by author from the DataFrame named jsonDF. Use dplyr::arrange and dplyr::desc to sort the result in descending order by counts. Then print the first 10 rows by default.

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

You could then use sparklyr::spark_write_table to write the result to a table in Databricks. For example, run the following code in a notebook cell to rerun the query and then write the result to a table named json_books_agg:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

To verify that the table was created, you could then use sparklyr::sdf_sql along with SparkR::showDF to display the table’s data. For example, run the following code in a notebook cell to query the table into a DataFrame and then use sparklyr::collect to print the first 10 rows of the DataFrame by default:

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

You could also use sparklyr::spark_read_table to do something similar. For example, run the following code in a notebook cell to query the preceding DataFrame named jsonDF into a DataFrame and then use sparklyr::collect to print the first 10 rows of the DataFrame by default:

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Add columns and compute column values in a DataFrame

You can use dplyr functions to add columns to DataFrames and to compute columns’ values.

For example, run the following code in a notebook cell to get the contents of the DataFrame named jsonDF. Use dplyr::mutate to add a column named today, and fill this new column with the current timestamp. Then write these contents to a new DataFrame named withDate and use dplyr::collect to print the new DataFrame’s first 10 rows by default.

Note

dplyr::mutate only accepts arguments that conform to Hive’s built-in functions (also known as UDFs) and built-in aggregate functions (also known as UDAFs). For general information, see Hive Functions. For information about the date-related functions in this section, see Date Functions.

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Now use dplyr::mutate to add two more columns to the contents of the withDate DataFrame. The new month and year columns contain the numeric month and year from the today column. Then write these contents to a new DataFrame named withMMyyyy, and use dplyr::select along with dplyr::collect to print the author, title, month and year columns of the new DataFrame’s first ten rows by default:

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Now use dplyr::mutate to add two more columns to the contents of the withMMyyyy DataFrame. The new formatted_date columns contains the yyyy-MM-dd portion from the today column, while the new day column contains the numeric day from the new formatted_date column. Then write these contents to a new DataFrame named withUnixTimestamp, and use dplyr::select along with dplyr::collect to print the title, formatted_date, and day columns of the new DataFrame’s first ten rows by default:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Create a temporary view

You can create named temporary views in memory that are based on existing DataFrames. For example, run the following code in a notebook cell to use SparkR::createOrReplaceTempView to get the contents of the preceding DataFrame named jsonTable and make a temporary view out of it named timestampTable. Then, use sparklyr::spark_read_table to read the temporary view’s contents. Use sparklyr::collect to print the first 10 rows of the temporary table by default:

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

Perform statistical analysis on a DataFrame

You can use sparklyr along with dplyr for statistical analyses.

For example, create a DataFrame to run statistics on. To do this, run the following code in a notebook cell to use sparklyr::sdf_copy_to to write the contents of the iris dataset that is built into R to a DataFrame named iris. Use sparklyr::sdf_collect to print the first 10 rows of the temporary table by default:

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

Now use dplyr::group_by to group rows by the Species column. Use dplyr::summarize along with dplyr::percentile_approx to calculate summary statistics by the 25th, 50th, 75th, and 100th quantiles of the Sepal_Length column by Species. Use sparklyr::collect a print the results:

Note

dplyr::summarize only accepts arguments that conform to Hive’s built-in functions (also known as UDFs) and built-in aggregate functions (also known as UDAFs). For general information, see Hive Functions. For information about percentile_approx, see Built-in Aggregate Functions(UDAF)).

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

Similar results can be calculated, for example, by using sparklyr::sdf_quantile:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9