To create a Delta table, you can use existing Apache Spark SQL code and change the format from
json, and so on, to
For all file types, you read the files into a DataFrame and write out in
events = spark.read.json("/databricks-datasets/structured-streaming/events/") events.write.format("delta").save("/mnt/delta/events") spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
library(SparkR) sparkR.session() events <- read.json("/databricks-datasets/structured-streaming/events/") write.df(events, source = "delta", path = "/mnt/delta/events") sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
CREATE TABLE events USING delta AS SELECT * FROM json.`/data/events/`
These operations create a new managed table using the schema that was inferred from the JSON data. For the full set of options available when you create a new Delta table, see Create a table and Write to a table.
If your source files are in Parquet format, you can use the SQL
Convert to Delta statement to convert files in place to create an unmanaged table:
CONVERT TO DELTA parquet.`/mnt/delta/events`
To speed up queries that have predicates involving the partition columns, you can partition data.
events = spark.read.json("/databricks-datasets/structured-streaming/events/") events.write.partitionBy("date").format("delta").save("/mnt/delta/events") spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
events <- read.json("/databricks-datasets/structured-streaming/events/") write.df(events, source = "delta", path = "/mnt/delta/events", partitionBy = "date") sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
To partition data when you create a Delta table using SQL, specify
PARTITIONED BY columns.
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING delta PARTITIONED BY (date)
Delta Lake supports a rich set of operations to modify tables.
You can write data into a Delta table using Structured Streaming. The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. By default, streams run in append mode, which adds new records to the table.
from pyspark.sql.types import * inputPath = "/databricks-datasets/structured-streaming/events/" jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) eventsDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time .json(inputPath) ) (eventsDF.writeStream .outputMode("append") .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json") .table("events") )
inputPath <- "/databricks-datasets/structured-streaming/events/" tablePath <- "/mnt/delta/events/" jsonSchema <- structType(structField("time", "timestamp", T), structField("action", "string", T)) eventsStream <- read.stream( "json", path = inputPath, schema = jsonSchema, maxFilesPerTrigger = 1) write.stream( eventsStream, path = tablePath, mode = "append", checkpointLocation = "/mnt/delta/events/_checkpoints/etl-from-json")
For more information about Delta Lake integration with Structured Streaming, see Table Streaming Reads and Writes.
To merge a set of updates and insertions into an existing table, you use the
MERGE INTO statement. For example, the following statement takes a stream of updates and merges it into the
events table. When there is already an event present with the same
eventId, Delta Lake updates the data column using the given expression. When there is no matching event, Delta Lake adds a new row.
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
You must specify a value for every column in your table when you perform an
INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
You access data in Delta tables either by specifying the path on DBFS (
"/mnt/delta/events") or the table name (
val events = spark.read.format("delta").load("/mnt/delta/events")
val events = spark.table("events")
events <- read.df(path = "/mnt/delta/events", source = "delta")
events <- tableToDF("events")
SELECT * FROM delta.`/mnt/delta/events`
SELECT * FROM events
To view the history of a table, use the
DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.
Delta Lake time travel allows you to query an older snapshot of a Delta table.
timestamp_string, only date or timestamp strings are accepted. For example,
To query an older version of a table, specify a version or timestamp in a
SELECT statement. For example, to query version 0 from the history above, use:
SELECT * FROM events VERSION AS OF 0
SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'
Because version 1 is at timestamp
'2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range
'2019-01-29 00:37:58' to
'2019-01-29 00:38:09' inclusive.
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events") df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")
For details, see Query an older snapshot of a table (time travel).
Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use
OPTIMIZE to collapse small files into larger ones:
To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. This co-locality is automatically used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the
ZORDER BY clause. For example, to co-locate by
OPTIMIZE events ZORDER BY (eventType)
For the full set of options available when running
OPTIMIZE, see Compaction (bin-packing).
Delta Lake provides snapshot isolation for reads, which means that it is safe to run
OPTIMIZE even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the
You control the age of the latest retained snapshot by using the
RETAIN <N> HOURS option:
VACUUM events RETAIN 24 HOURS
For details on using
VACUUM effectively, see Vacuum.