Tutorial: Load and transform data in PySpark DataFrames

This tutorial shows you how to load and transform U.S. city data using the Apache Spark Python (PySpark) DataFrame API in Databricks.

By the end of this tutorial, you will understand what a DataFrame is and be familiar with the following tasks:

See also Apache Spark PySpark API reference.

What is a DataFrame?

A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.

Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).

Requirements

To complete the following tutorial, you must meet the following requirements:

Note

If you do not have cluster control privileges, you can still complete most of the following steps as long as you have access to a cluster.

From the sidebar on the homepage, you access Databricks entities: the workspace browser, catalog, explorer, workflows, and compute. Workspace is the root folder that stores your Databricks assets, like notebooks and libraries.

Step 1: Create a DataFrame with Python

To learn how to navigate Databricks notebooks, see Databricks notebook interface and controls.

  1. Open a new notebook and insert a new cell by clicking the New Icon icon.

  2. Copy and paste the following code into an empty notebook cell, then press Shift+Enter to run the cell. The following code example creates a DataFrame named df1 with city population data and displays its contents.

data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]]
columns = ["rank", "city", "state", "code", "population", "price"]

df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE")
display(df1)

Step 2: Load data into a DataFrame from files

Add more city population data from the /databricks-datasets directory into df2.

To load data into DataFrame df2 from the data_geo.csv file:

  1. In the notebook, create a new cell.

  2. Copy and paste the following code into the empty notebook cell, then press Shift+Enter to run the cell.

You can load data from many supported file formats. The following example uses a dataset available in the /databricks-datasets directory, accessible from most workspaces. See Sample datasets.

df2 = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

Step 3: View and interact with your DataFrame

View and interact with your city population DataFrames using the following methods.

Combine DataFrames

Combine the contents of your first DataFrame with the DataFrame that contains the contents of data_geo.csv.

In the notebook, use the following example code to create a new DataFrame that adds the rows of one DataFrame to another using the union operation:

# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)

View the DataFrame

To view the U.S. city data in a tabular format, use the Databricks display() command in a notebook cell.

display(df)

Filter rows in a DataFrame

Discover the five most populous cities in your data set by filtering rows, using .filter() or .where(). Use filtering to select a subset of rows to return or modify in a DataFrame. There is no difference in performance or syntax, as seen in the following examples.

# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)

# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)

Select columns from a DataFrame

Learn about which state a city is located in with the select() method. Select columns by passing one or more column names to .select(), as in the following example:

select_df = df.select("City", "State")
display(select_df)

Create a subset DataFrame

Create a subset DataFrame with the ten cities with the highest population and display the resulting data. Combine select and filter queries to limit rows and columns returned, using the following code in your notebook:

subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)

Step 4: Save the DataFrame

You can either save your DataFrame to a table or write the DataFrame to a file or multiple files.

Save the DataFrame to a table

Databricks uses the Delta Lake format for all tables by default. To save your DataFrame, you must have CREATE table privileges on the catalog and schema. The following example saves the contents of the DataFrame to a table named us_cities:

df.write.saveAsTable("us_cities")

Most Spark applications work on large data sets and in a distributed fashion. Spark writes out a directory of files rather than a single file. Delta Lake splits the Parquet folders and files. Many data systems can read these directories of files. Databricks recommends using tables over file paths for most applications.

Save the DataFrame to JSON files

The following example saves a directory of JSON files:

# Write a DataFrame to a collection of files
df.write.format("json").save("/tmp/json_data")

Read the DataFrame from a JSON file

# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

Additional tasks: Run SQL queries in PySpark

Spark DataFrames provide the following options to combine SQL with Python. You can run the following code in the same notebook that you created for this tutorial.

Specify a column as a SQL query

The selectExpr() method allows you to specify each column as a SQL query, such as in the following example:

display(df.selectExpr("`rank`", "upper(city) as big_name"))

Import expr()

You can import the expr() function from pyspark.sql.functions to use SQL syntax anywhere a column would be specified, as in the following example:

from pyspark.sql.functions import expr

display(df.select("rank", expr("lower(city) as little_name")))

Run an arbitrary SQL query

You can use spark.sql() to run arbitrary SQL queries, as in the following example:

query_df = spark.sql("SELECT * FROM us_cities")

Parameterize SQL queries

You can use Python formatting to parameterize SQL queries, as in the following example:

table_name = "us_cities"

query_df = spark.sql(f"SELECT * FROM {table_name}")