Tutorial: COPY INTO with Spark SQL
Databricks recommends that you use the COPY INTO command for incremental and bulk data loading for data sources that contain thousands of files. Databricks recommends that you use Auto Loader for advanced use cases.
In this tutorial, you use the COPY INTO
command to load data from cloud object storage into a table in your Databricks workspace.
Requirements
A Databricks account, and a Databricks workspace in your account. To create these, see Get started with Databricks.
An all-purpose cluster in your workspace running Databricks Runtime 11.3 LTS or above. To create an all-purpose cluster, see Compute configuration reference.
Familiarity with the Databricks workspace user interface. See Navigate the workspace.
Familiarity working with Databricks notebooks.
A location you can write data to; this demo uses the DBFS root as an example, but Databricks recommends an external storage location configured with Unity Catalog.
Step 1. Configure your environment and create a data generator
This tutorial assumes basic familiarity with Databricks and a default workspace configuration. If you are unable to run the code provided, contact your workspace administrator to make sure you have access to compute resources and a location to which you can write data.
Note that the provided code uses a source
parameter to specify the location you’ll configure as your COPY INTO
data source. As written, this code points to a location on DBFS root. If you have write permissions on an external object storage location, replace the dbfs:/
portion of the source string with the path to your object storage. Because this code block also does a recursive delete to reset this demo, make sure that you don’t point this at production data and that you keep the /user/{username}/copy-into-demo
nested directory to avoid overwriting or deleting existing data.
Create a new SQL notebook and attach it to a cluster running Databricks Runtime 11.3 LTS or above.
Copy and run the following code to reset the storage location and database used in this tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copy and run the following code to configure some tables and functions that will be used to randomly generate data:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Step 2: Write the sample data to cloud storage
Writing to data formats other than Delta Lake is rare on Databricks. The code provided here writes to JSON, simulating an external system that might dump results from another system into object storage.
Copy and run the following code to write a batch of raw JSON data:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Step 3: Use COPY INTO to load JSON data idempotently
You must create a target Delta Lake table before you can use COPY INTO
. In Databricks Runtime 11.3 LTS and above, you do not need to provide anything other than a table name in your CREATE TABLE
statement. For previous versions of Databricks Runtime, you must provide a schema when creating an empty table.
Copy and run the following code to create your target Delta table and load data from your source:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Because this action is idempotent, you can run it multiple times but data will only be loaded once.
Step 4: Preview the contents of your table
You can run a simple SQL query to manually review the contents of this table.
Copy and execute the following code to preview your table:
-- Review updated table SELECT * FROM user_ping_target
Step 5: Load more data and preview results
You can re-run steps 2-4 many times to land new batches of random raw JSON data in your source, idempotently load them to Delta Lake with COPY INTO
, and preview the results. Try running these steps out of order or multiple times to simulate multiple batches of raw data being written or executing COPY INTO
multiple times without new data having arrived.
Step 6: Clean up tutorial
When you are done with this tutorial, you can clean up the associated resources if you no longer want to keep them.
Copy and run the following code to drop the database, tables, and remove all data:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
To stop your compute resource, go to the Clusters tab and Terminate your cluster.
Additional resources
The COPY INTO reference article