This quickstart demonstrates the basics of working with Databricks Delta. This topic shows how to build a pipeline that reads JSON data into a Databricks Delta table and then append additional data. The topic includes an example notebook that demonstrates basic Databricks Delta operations.
In this topic:
Create a table from a dataset. You can use existing Spark SQL code and change the format from
json, and so on, to
events = spark.read.json("/data/events") events.write.format("delta").save("/data/events")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Databricks Delta table, see Create a table and Write to a table.
You access data in Databricks Delta tables either by specifying the path on DBFS (
"/data/events") or the table name (
events = spark.read.format("delta").load("/data/events")
events = spark.table("events")
SELECT * FROM delta.`/data/events`
SELECT * FROM events
As new events arrive, you can atomically append them to the table:
newEvents.write .format("delta") .mode("append") .save("/data/events")
newEvents.write .format("delta") .mode("append") .saveAsTable("events")
INSERT INTO events VALUES(...)
INSERT INTO events SELECT * FROM newEvents
For an example of how to create a Databricks Delta table and append to it, see the following notebook:
You can also use Structured Streaming to stream new data as it arrives into the table:
events = spark.readStream.json("/data/events") events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoint/etl-from-json") .start("/delta/events")
For more information about Databricks Delta integration with Structured Streaming, see Table Streaming Reads and Writes.
Once you have been streaming for awhile, you will likely have a lot of small files in the table. If you want to improve the speed of read queries, you can use
OPTIMIZE to collapse small files into larger ones:
You can also specify interesting columns that are often present in query predicates for your workload, and Databricks Delta uses this information to cluster related records together:
OPTIMIZE events ZORDER BY eventType, city
For the full set of options available when running
OPTIMIZE, see Optimizing Performance and Cost.
Databricks Delta provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually you should clean up old snapshots. You can do this by running the
You control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on using
VACUUM effectively, see Garbage collection.