Ingest data into Delta Lake
Databricks offers a variety of ways to help you ingest data into Delta Lake.
Upload CSV files
You can securely create tables from CSV files using the Create Table UI in Databricks SQL.
Partner integrations
Databricks partner integrations enable you to easily load data into Databricks. These integrations enable low-code, easy-to-implement, and scalable data ingestion from a variety of sources into Databricks. See the Databricks integrations.
COPY INTO
SQL command
The COPY INTO
SQL command lets you load data from a file location into a Delta table. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped.
Use the COPY INTO
SQL command instead of Auto Loader when:
You want to load data from a file location that contains files in the order of thousands or fewer.
Your data schema is not expected to evolve frequently.
You plan to load subsets of previously uploaded files.
For a brief overview and demonstration of the COPY INTO
SQL command, as well as Auto Loader later in this article, watch this YouTube video (2 minutes).
The following example shows how to create a Delta table and then use the COPY INTO
SQL command to load sample data from Sample datasets (databricks-datasets) into the table. You can run the example Python, R, Scala, or SQL code from within a notebook attached to a Databricks cluster. You can also run the SQL code from within a query associated with a SQL warehouse in Databricks SQL.
Note
Some of the following code examples use a two-level namespace notation consisting of a schema (also called a database) and a table or view (for example, default.people10m
). To use these examples with Unity Catalog, replace the two-level namespace with Unity Catalog three-level namespace notation consisting of a catalog, schema, and table or view (for example, main.default.people10m
). For more information, see Use three-level namespace notation with Unity Catalog.
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
To clean up, run the following code, which deletes the table:
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
For more examples and details, see
Databricks Runtime 7.x and above: COPY INTO
Databricks Runtime 5.5 LTS and 6.x: Copy Into (Delta Lake on Databricks)
Auto Loader
Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage without any additional setup. Auto Loader provides a new Structured Streaming source called cloudFiles
. Given an input directory path on the cloud file storage, the cloudFiles
source automatically processes new files as they arrive, with the option of also processing existing files in that directory.
Use Auto Loader instead of the COPY INTO SQL command when:
You want to load data from a file location that contains files in the order of millions or higher. Auto Loader can discover files more efficiently than the
COPY INTO
SQL command and can split file processing into multiple batches.Your data schema evolves frequently. Auto Loader provides better support for schema inference and evolution. See Configuring schema inference and evolution in Auto Loader.
You do not plan to load subsets of previously uploaded files. With Auto Loader, it can be more difficult to reprocess subsets of files. However, you can use the
COPY INTO
SQL command to reload subsets of files while an Auto Loader stream is simultaneously running.
For a brief overview and demonstration of Auto Loader, as well as the COPY INTO SQL command earlier in this article, watch this YouTube video (2 minutes).
For a longer overview and demonstration of Auto Loader, watch this YouTube video (59 minutes).
Note
Some of the following code examples use a two-level namespace notation consisting of a schema (also called a database) and a table or view (for example, default.people10m
). To use these examples with Unity Catalog, replace the two-level namespace with Unity Catalog three-level namespace notation consisting of a catalog, schema, and table or view (for example, main.default.people10m
). For more information, see Use three-level namespace notation with Unity Catalog.
The following code example demonstrates how Auto Loader detects new data files as they arrive in cloud storage. You can run the example code from within a notebook attached to a Databricks cluster.
Create the file upload directory, for example:
user_dir = '<my-name>@<my-organization.com>' upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
val user_dir = "<my-name>@<my-organization.com>" val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
Create the following sample CSV files, and then upload them to the file upload directory by using the DBFS file browser:
WA.csv
:city,year,population Seattle metro,2019,3406000 Seattle metro,2020,3433000
OR.csv
:city,year,population Portland metro,2019,2127000 Portland metro,2020,2151000
Run the following code to start Auto Loader.
checkpoint_path = '/tmp/delta/population_data/_checkpoints' write_path = '/tmp/delta/population_data' # Set up the stream to begin reading incoming files from the # upload_path location. df = spark.readStream.format('cloudFiles') \ .option('cloudFiles.format', 'csv') \ .option('header', 'true') \ .schema('city string, year int, population long') \ .load(upload_path) # Start the stream. # Use the checkpoint_path location to keep a record of all files that # have already been uploaded to the upload_path location. # For those that have been uploaded since the last check, # write the newly-uploaded files' data to the write_path location. df.writeStream.format('delta') \ .option('checkpointLocation', checkpoint_path) \ .start(write_path)
val checkpoint_path = "/tmp/delta/population_data/_checkpoints" val write_path = "/tmp/delta/population_data" // Set up the stream to begin reading incoming files from the // upload_path location. val df = spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .option("header", "true") .schema("city string, year int, population long") .load(upload_path) // Start the stream. // Use the checkpoint_path location to keep a record of all files that // have already been uploaded to the upload_path location. // For those that have been uploaded since the last check, // write the newly-uploaded files' data to the write_path location. df.writeStream.format("delta") .option("checkpointLocation", checkpoint_path) .start(write_path)
With the code from step 3 still running, run the following code to query the data in the write directory:
df_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ '''
val df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ */
With the code from step 3 still running, create the following additional CSV files, and then upload them to the upload directory by using the DBFS file browser:
ID.csv
:city,year,population Boise,2019,438000 Boise,2020,447000
MT.csv
:city,year,population Helena,2019,81653 Helena,2020,82590
Misc.csv
:city,year,population Seattle metro,2021,3461000 Portland metro,2021,2174000 Boise,2021,455000 Helena,2021,81653
With the code from step 3 still running, run the following code to query the existing data in the write directory, in addition to the new data from the files that Auto Loader has detected in the upload directory and then written to the write directory:
df_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ '''
val df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ */
To clean up, cancel the running code in step 3, and then run the following code, which deletes the upload, checkpoint, and write directories:
dbutils.fs.rm(write_path, True) dbutils.fs.rm(upload_path, True)
dbutils.fs.rm(write_path, true) dbutils.fs.rm(upload_path, true)
For more details, see Auto Loader.