Tutorial: Load and transform data with Apache Spark Scala DataFrames
This tutorial shows you how to use the Apache Spark Scala DataFrame API in Databricks to load and transform U.S. city data.
By the end of this tutorial, you will understand what a DataFrame is and be familiar with the following tasks:
See also Apache Spark Scala API reference.
What is a DataFrame?
A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.
Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).
What is a Spark Dataset?
The Apache Spark Dataset API provides a type-safe, object-oriented programming interface. DataFrame
is an alias for an untyped Dataset [Row]
. See Dataset API.
Databricks documentation uses the term DataFrame, because it applies to Python, Scala, and R. See Example notebook: Scala Dataset aggregator.
Requirements
To complete the following tutorial, you must meet the following requirements:
You are logged into a Databricks workspace.
You have permission to create compute enabled with Unity Catalog.
Note
If you do not have cluster creation privileges, you can still complete most of the following steps as long as you have access to an existing cluster.
From the sidebar on the homepage, you access Databricks entities: the workspace browser, Catalog Explorer, workflows, and compute. Workspace is the root folder that stores Databricks assets like notebooks and libraries.
Step 1: Create a DataFrame with Scala
To learn how to navigate Databricks notebooks, see Databricks notebook interface and controls.
Open a new notebook by clicking the icon.
Copy and paste the following code into the empty notebook cell, then press
Shift+Enter
to run the cell. The following code example creates a DataFrame nameddf1
with city population data and displays its contents.// Create a data frame from the given data case class City(rank: Long, city: String, state: String, code: String, population: Long, price: Double) val df1 = Seq(new City(295, "South Bend", "Indiana", "IN", 101190, 112.9)).toDF display(df1)
Step 2: Load data into a DataFrame from files
Add more city population data from the /databricks-datasets
directory into df2
.
To load data into DataFrame df2
from the data_geo.csv
file, copy and paste the following code into the new empty notebook cell. Press Shift+Enter
to run the cell.
You can load data from many supported file formats. The following example uses a dataset available in the /databricks-datasets
directory, accessible from most workspaces. See Sample datasets.
// Create second dataframe from a file
val df2 = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
Step 3: View and interact with your DataFrame
View and interact with your city population DataFrames using the following methods.
Combine DataFrames
Combine the contents of your first DataFrame df1
with DataFrame df2
containing the contents of data_geo.csv
.
In the notebook, use the following example code to create a new DataFrame that adds the rows of one DataFrame to another using the union operation:
// Returns a DataFrame that combines the rows of df1 and df2
val df = df1.union(df2)
View the DataFrame
To view the U.S. city data in a tabular format, use the Databricks display()
command in a notebook cell.
display(df)
Print the DataFrame schema
Spark uses the term schema to refer to the names and data types of the columns in the DataFrame.
Print the schema of your DataFrame with the following .printSchema()
method in your notebook. Use the resulting metadata to interact with the contents of your DataFrame.
df.printSchema()
Filter rows in a DataFrame
Discover the five most populous cities in your data set by filtering rows, using .filter()
or .where()
. Use filtering to select a subset of rows to return or modify in a DataFrame. There is no difference in performance or syntax, as seen in the following example.
To select the most populous cities, continue to add new cells to your notebook and add the following code example:
// Filter rows using .filter()
val filtered_df = df.filter(df("rank") < 6)
display(filtered_df)
// Filter rows using .where()
val filtered_df = df.where(df("rank") < 6)
display(filtered_df)
Select columns from a DataFrame
Learn about which state a city is located in using the select()
method. Select columns by passing one or more column names to .select()
, as in the following example:
// Select columns from a DataFrame
val select_df = df.select("City", "State")
display(select_df)
Create a subset DataFrame
Create a subset DataFrame with the ten cities with the highest population and display the resulting data. Combine select and filter queries to limit rows and columns returned, using the following code in your notebook:
// Create a subset Dataframe
val subset_df = df.filter(df("rank") < 11).select("City")
display(subset_df)
Step 4: Save the DataFrame
You can either save your DataFrame to a table or write the DataFrame to a file or multiple files.
Save the DataFrame to a table
Databricks uses the Delta Lake format for all tables by default. To save your DataFrame, you must have CREATE
table privileges on the catalog and schema. The following example saves the contents of the DataFrame to a table named us_cities
:
// Save DataFrame to a table
df.write.saveAsTable("us_cities")
Most Spark applications work on large data sets and in a distributed fashion. Spark writes out a directory of files rather than a single file. Delta Lake splits the Parquet folders and files. Many data systems can read these directories of files. Databricks recommends using tables over file paths for most applications.
Additional tasks: Run SQL queries in Scala
Spark DataFrames provide the following options to combine SQL with Scala. You can run the following code in the same notebook that you created for this tutorial.
Specify a column as a SQL query
The selectExpr()
method allows you to specify each column as a SQL query, such as in the following example:
display(df.selectExpr("`rank`", "upper(city) as big_name"))
Import expr()
You can import org.apache.spark.sql.functions.expr
to use SQL syntax anywhere a column would be specified, as in the following example:
import org.apache.spark.sql.functions.expr
display(df.select($"rank", expr("lower(city) as little_name")))