Databricks Delta Tables - Batch Reads and Writes¶
Databricks Delta supports most of the options provided by Spark SQL DataFrame read and write APIs.
Databricks Delta supports metadata definition using standard DDL.
Create a table¶
Databricks Delta supports creating tables in the metastore using standard DDL:
CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA PARTITIONED BY (date) LOCATION '/mnt/delta/events'
When you create a table in the metastore using Databricks Delta, it creates a symlink-like pointer in the metastore to the transaction log and data that are stored on DBFS. This pointer makes it easier for other users to discover and refer to the data without having to worry about exactly where it is stored.
Create table best practices¶
When you run
CREATE TABLE with a
LOCATION that already contains data stored using Databricks Delta, Databricks Delta does the following:
If you specify only the table name and location, for example:CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events'the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data. This functionality can be used to “import” data into the metastore.
If you specify any configuration (schema, partitioning, or table properties), Databricks Delta verifies that the specification exactly matches the configuration of the existing data.
If the specified configuration does not exactly match the configuration of the data, Delta throws an exception that describes the discrepancy.
PARTITION BY columns¶
In order to cluster the data, when you create a Databricks Delta table you can optionally specify
PARTITION BY columns. This clustering is used to speed up queries or DML that have predicates involving the partition columns.
If you want to control the location of the table, you also specify the
LOCATION as a path on DBFS.
Tables created with a specified
LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when the table is DROP-ed.
Change the schema¶
When you append data to a Databricks Delta table with an updated schema, Databricks Delta automatically evolves the schema on your behalf. Readers that read from the appended Databricks Delta table see the updated schema instead of the old one.
Because of the way the Spark analyzer checks types, you must recreate any views that reference tables with updated schemas. Streams reading from a Databricks Delta table must be restarted when a schema change occurs.
You can load a Databricks Delta table as a DataFrame by specifying either its table name or path:
The DataFrame returned automatically reads the most recent snapshot of the table for any query. You should never need to run
REFRESH TABLE. Databricks Delta automatically uses partitioning and statistics to read the minimum amount of data when there are applicable predicates in the query.
You can optionally specify partition columns when creating a new Databricks Delta table. Partitioning is used to speed up queries or DML that have predicates involving the partition columns. A common pattern is to partition by date, for example:
Append with DataFrames¶
Using the mode
append you can atomically add new data to an existing Databricks Delta table:
Overwrite data using DataFrames¶
If you want to atomically replace all of the data in a table, you can use
df.write .format("delta") .mode("overwrite") .save("/delta/events")
You can selectively overwrite only the data that matches predicates over partition columns. The following command atomically replaces the month of January with the data in
df.write .format("delta") .mode("overwrite") .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") .save("/delta/events")
This sample code writes out the data in
df, validates that it all falls within the specified partitions, and performs an atomic replacement.
Unlike the file APIs in Apache Spark, Databricks Delta remembers and enforces the schema of a table. This means that by default overwrites do not replace the schema of an existing table.
UPDATE statement allows you to apply expressions to change the value of columns when a row matches a predicate. For example, you can use
UPDATE to fix a spelling mistake in the
UPDATE events SET eventType = 'click' WHERE eventType = 'clck'
Similar to delete, update operations automatically make use of the partitioning of the table when possible.
MERGE INTO statement allows you to merge a set of updates and insertions into an existing dataset. For example, the following statement takes a stream of updates and merges it into the
events table. When there is already an event present with the same
eventId, Databricks Delta updates the data column using the given expression. When there is no matching event, Databricks Delta adds a new row.
Here’s a worked example:
MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN UPDATE SET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
You must specify a value for every column in your table when you perform an
INSERT (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
When you use
MERGE INTO, the destination table can be large, but the source table must be small. If your workload does not satisfy this requirement, try using separate
Databricks Delta tables allow you to remove data that matches a predicate. For instance, to delete all events from before 2017, you can run the following DML:
DELETE FROM events WHERE date < '2017-01-01'
Delete operations automatically make use of the partitioning of the table when possible. This optimization means that it will be significantly faster to delete data based on partition predicates.
Write from multiple workspaces¶
Writing to the same table from more than one Databricks workspace at the same time is not supported.
Validate new data¶
Databricks Delta automatically validates that the schema of the
DataFrame being written is compatible with the schema of the table. Columns that are present in the table but not in the DataFrame are set to null. This operation fails if there are any extra columns in the DataFrame that are not present in the table. Databricks Delta has DDL to add new columns explicitly and the ability to update the schema automatically. See Change the schema.
If you specify other options such as
partitionBy in combination with append mode, Databricks Delta validates that they match and throws an error for any mismatch. When
partitionBy is not present, appends automatically follow the partitioning of the existing data.
Views on Databricks Delta tables¶
Databricks Delta supports the creation of views on top of Databricks Delta tables just like you might with a data source table. These views integrate with Table Access Control to allow for column and row level security. The core challenge when you operate with views is resolving the schemas. If you alter a Databricks Delta table schema, derivative views must be recreated to account for any additions to the schema.
For instance, if you add a new column to a Databricks Delta table, you must make sure that this column is available in the appropriate views built on top of that base table.