Use Delta Lake with Unity Catalog

Unity Catalog (Preview) is a fine-grained governance solution for data and AI on the Lakehouse. Delta Lake operations and features can be used in Unity Catalog.

You can use Delta managed tables within Unity Catalog metastores. You can also register Delta formatted external tables with the metastore. Additionally, you can create Unity Catalog views based on these managed and external tables.

Unity Catalog object model overview

The Unity Catalog object model contains the following securable objects: metastores, catalogs, schemas, tables, and views.

Use three-level namespace notation with Unity Catalog

The combination of a metastore’s catalog, schema, and table or view is known as Unity Catalog’s three-level namespace notation. Delta Lake supports this three-level namespace notation for referencing Delta data in Unity Catalog.

To reference Delta Lake data within Unity Catalog, you use this three-level namespace notation consisting of a catalog name, a schema name, and a table or view name, with the syntax <catalog>.<schema>.<table/view>.

Instead of declaring the catalog and the schema with every reference, you can use the following statements to declare the catalog and schema to use in advance:

  • Specify USE CATALOG <catalog> to declare the catalog, and then shorten references to <schema>.<table/view>.

  • Specify USE <catalog> then USE SCHEMA <schema> to declare the catalog and schema, and then shorten references to <table/view>.

  • Specify USE <catalog>.<schema> to declare the catalog and schema, and then shorten references to <table/view>.

Take, for example, a three-level namespace in Unity Catalog consisting of a catalog named main, a schema within that catalog named default, and a table within that schema named diamonds. You use the following three-level namespace notation syntax options to refer to this combination of catalog, schema, and table:

# Reference <catalog>.<schema>.<table/view>.
display(spark.sql("SELECT * FROM main.default.diamonds LIMIT 2"))

# Or, specify <catalog>, and reference <schema>.<table/view>.
spark.sql("USE CATALOG main")
display(spark.sql("SELECT * FROM default.diamonds LIMIT 2"))

# Or, specify <catalog> and then <schema>, and reference <table/view>.
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA default")
display(spark.sql("SELECT * FROM diamonds LIMIT 2"))

# Or, specify <catalog>.schema>, and reference <table/view>.
spark.sql("USE main.default")
display(spark.sql("SELECT * FROM diamonds LIMIT 2"))

# Or, use the DataFrame API. You can also specify USE CATALOG or
# USE to declare the catalog or catalog and schema in advance.
display(spark.table("main.default.diamonds").limit(2))
library(SparkR)

# Reference <catalog>.<schema>.<table/view>.
display(sql("SELECT * FROM main.default.diamonds LIMIT 2"))

# Or, specify <catalog>, and reference <schema>.<table/view>.
sql("USE CATALOG main")
display(sql("SELECT * FROM default.diamonds LIMIT 2"))

# Or, specify <catalog> and then <schema>, and reference <table/view>.
sql("USE CATALOG main")
sql("USE SCHEMA default")
display(sql("SELECT * FROM diamonds LIMIT 2"))

# Or, specify <catalog>.schema>, and reference <table/view>.
sql("USE main.default")
display(sql("SELECT * FROM diamonds LIMIT 2"))

# Or, use the DataFrame API. You can also specify USE CATALOG or
# USE to declare the catalog or catalog and schema in advance.
df = tableToDF("main.default.diamonds")
display(limit(df, 2))
// Reference <catalog>.<schema>.<table/view>.
display(spark.sql("SELECT * FROM main.default.diamonds LIMIT 2"))

// Or, specify <catalog>, and reference <schema>.<table/view>.
spark.sql("USE CATALOG main")
display(spark.sql("SELECT * FROM default.diamonds LIMIT 2"))

// Or, specify <catalog> and then <schema>, and reference <table/view>.
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA default")
display(spark.sql("SELECT * FROM diamonds LIMIT 2"))

// Or, specify <catalog>.schema>, and reference <table/view>.
spark.sql("USE main.default")
display(spark.sql("SELECT * FROM diamonds LIMIT 2"))

// Or, use the DataFrame API. You can also specify USE CATALOG or
// USE to declare the catalog or catalog and schema in advance.
display(spark.table("main.default.diamonds").limit(2))
-- Reference <catalog>.<schema>.<table/view>.
SELECT * FROM main.default.diamonds LIMIT 2

-- Or, specify <catalog>, and reference <schema>.<table/view>.
USE CATALOG main;
SELECT * FROM default.diamonds LIMIT 2

-- Or, specify <catalog> and then <schema>, and reference <table/view>.
USE CATALOG main;
USE SCHEMA default;
SELECT * FROM diamonds LIMIT 2

-- Or, specify <catalog>.schema>, and reference <table/view>.
USE main.default;
SELECT * FROM diamonds LIMIT 2

See also Query data.

Available Delta Lake operations and features for Unity Catalog

Most Delta Lake operations and features are enabled in Delta managed tables within Unity Catalog metastores, and in Delta formatted external tables registered with Unity Catalog metastores. Delta Lake operations and features not supported in Unity Catalog are listed in Limitations of Delta Lake for Unity Catalog.

Delta Lake create, insert, read, and delete examples for Unity Catalog

The following examples show the basics of how to create and delete Delta tables and views in Unity Catalog, as well as how to insert data into tables and read data from Delta tables and views in Unity Catalog. You can run these examples from a notebook within a Databricks workspace. The workspace must be enabled for Unity Catalog. The notebook must be attached to a cluster that is compatible with Unity Catalog. See Create a compute resource.

Create Delta tables and views in Unity Catalog

The following examples show how to create Delta tables and views in Unity Catalog.

Create a Delta managed table in Unity Catalog

The following example creates a Delta managed table in Unity Catalog. This example assumes that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main and an existing schema named default within that catalog. To get access, see Data permissions.

spark.sql("USE main.default")

spark.sql("CREATE TABLE IF NOT EXISTS employees ("
  "  employeeID INT,"
  "  firstName STRING,"
  "  lastName STRING,"
  "  departmentID INT"
  ") PARTITIONED BY (departmentID)")
library(SparkR)

sql("USE main.default")

sql(paste("CREATE TABLE IF NOT EXISTS employees (",
  "  employeeID INT,",
  "  firstName STRING,",
  "  lastName STRING,",
  "  departmentID INT",
  ") PARTITIONED BY (departmentID)",
  sep = ""))
spark.sql("USE main.default")

spark.sql("CREATE TABLE IF NOT EXISTS employees (" +
  "  employeeID INT," +
  "  firstName STRING," +
  "  lastName STRING," +
  "  departmentID INT" +
  ") PARTITIONED BY (departmentID)")
USE main.default;

CREATE TABLE IF NOT EXISTS employees (
  employeeID INT,
  firstName STRING,
  lastName STRING,
  departmentID INT
) PARTITIONED BY (departmentID)

See also Create your first table and Create a managed table.

Create a Delta external table registered with Unity Catalog

The following example creates a Delta external table and registers it with Unity Catalog. This example assumes that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main and an existing schema named default within that catalog.

This example specifies a fictitious external location. It assumes that within your Databricks workspace’s Unity Catalog metastore that you have access to that fictitious location. To register an actual external location with Unity Catalog, see Manage external locations and storage credentials. To get access, see Data permissions.

spark.sql("USE main.default")

spark.sql("CREATE TABLE IF NOT EXISTS extloc_employees ("
  "  employeeID INT,"
  "  firstName STRING,"
  "  lastName STRING,"
  "  departmentID INT"
  ") "
  "USING DELTA "
  "PARTITIONED BY (departmentID) "
  "LOCATION 's3://my-bucket/extloc_employees'")
library(SparkR)

sql("USE main.default")

sql(paste("CREATE TABLE IF NOT EXISTS extloc_employees (",
  "  employeeID INT,",
  "  firstName STRING,",
  "  lastName STRING,",
  "  departmentID INT",
  ") ",
  "USING DELTA ",
  "PARTITIONED BY (departmentID) ",
  "LOCATION 's3://my-bucket/extloc_employees'",
  sep = ""))
spark.sql("USE main.default")

spark.sql("CREATE TABLE IF NOT EXISTS extloc_employees (" +
  "  employeeID INT," +
  "  firstName STRING," +
  "  lastName STRING," +
  "  departmentID INT" +
  ") " +
  "USING DELTA " +
  "PARTITIONED BY (departmentID) " +
  "LOCATION 's3://my-bucket/extloc_employees'")
USE main.default;

CREATE TABLE IF NOT EXISTS extloc_employees (
  employeeID INT,
  firstName STRING,
  lastName STRING,
  departmentID INT
)
USING DELTA
PARTITIONED BY (departmentID)
LOCATION 's3://my-bucket/extloc_employees'

See also Create an external table.

Create a view in Unity Catalog

The following example creates a view in Unity Catalog. This example assumes that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main, an existing schema named default within that catalog, and an existing managed table named employees within that schema. To get access, see Data permissions.

spark.sql("USE main.default")

spark.sql(("CREATE OR REPLACE VIEW employee_ids_depts AS "
  "SELECT employeeID, departmentID FROM employees"))
library(SparkR)

sql("USE main.default")

sql(paste("CREATE OR REPLACE VIEW employee_ids_depts AS ",
  "SELECT employeeID, departmentID FROM employees",
  sep = ""))
spark.sql("USE main.default")

spark.sql("CREATE OR REPLACE VIEW employee_ids_depts AS " +
  "SELECT employeeID, departmentID FROM employees")
USE main.default;

CREATE OR REPLACE VIEW employee_ids_depts AS
  SELECT employeeID, departmentID FROM employees;

See also Create views.

Insert data into a table in Unity Catalog

The following example inserts data into a table in Unity Catalog. This example assumes that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main, an existing schema named default within that catalog, and an existing managed table named employees within that schema. To get access, see Data permissions.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df = spark.createDataFrame( \
  data = [
    (1, 'John',     'Doe',   1), \
    (2, 'Mary',     'Smith', 2), \
    (3, 'Scott',    'Doe',   2), \
    (4, 'Sandy',    'Smith', 3), \
    (5, 'Jennifer', 'Jones', 2), \
    (6, 'Marty',    'Smith', 2) \
  ], \
  schema = StructType([ \
    StructField("employeeID",   IntegerType(), True), \
    StructField("firstName",    StringType(),  True), \
    StructField("lastName",     StringType(),  True), \
    StructField("departmentID", IntegerType(), True) \
  ]) \
)

df.write.insertInto( \
  tableName = "main.default.employees" \
)
library(SparkR)

df = createDataFrame(
  data = list(
    list("employeeID" = 1L, "firstName" = "John",     "lastName" = "Doe",   "departmentID" = 1L),
    list("employeeID" = 2L, "firstName" = "Mary",     "lastName" = "Smith", "departmentID" = 2L),
    list("employeeID" = 3L, "firstName" = "Scott",    "lastName" = "Doe",   "departmentID" = 2L),
    list("employeeID" = 4L, "firstName" = "Sandy",    "lastName" = "Smith", "departmentID" = 3L),
    list("employeeID" = 5L, "firstName" = "Jennifer", "lastName" = "Jones", "departmentID" = 2L),
    list("employeeID" = 6L, "firstName" = "Marty",    "lastName" = "Smith", "departmentID" = 2L)
  ),
  schema = structType(
    structField("employeeID",   "integer", TRUE),
    structField("firstName",    "string",  TRUE),
    structField("lastName",     "string",  TRUE),
    structField("departmentID", "integer", TRUE)
  )
)

insertInto(
  x         = df,
  tableName = "main.default.employees"
)
import spark.implicits._

val df = Seq(
  (1, "John",     "Doe",   1),
  (2, "Mary",     "Smith", 2),
  (3, "Scott",    "Doe",   2),
  (4, "Sandy",    "Smith", 3),
  (5, "Jennifer", "Jones", 2),
  (6, "Marty",    "Smith", 2)
).toDF(
  "employeeID",
  "firstName",
  "lastName",
  "departmentID"
)

df.write.insertInto("main.default.employees")
USE main.default;

INSERT INTO employees VALUES
  (1, "John", "Doe", 1),
  (2, "Mary", "Smith", 2),
  (3, "Scott", "Doe", 2),
  (4, "Sandy", "Smith", 3),
  (5, "Jennifer", "Jones", 2),
  (6, "Marty", "Smith", 2)

Read from Delta tables and views in Unity Catalog

The following examples show how to read data from Delta tables and views in Unity Catalog. These examples assume that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main, an existing schema named default within that catalog, and an existing managed table named employees and view named employee_ids_depts within that schema. To get access, see Data permissions.

Read from a Delta table in Unity Catalog

from pyspark.sql.functions import col

df = spark.table("main.default.employees")

df = df.groupBy(df.departmentID).count()
df = df.select(col("count").alias("employeeCount"), "departmentID")
df.orderBy(df.departmentID, ascending = False)

display(df)
library(SparkR)

df = tableToDF("main.default.employees")

dfCount = count(groupBy(df, "departmentID"))

dfSelect = select(
  x   = dfCount,
  col = list(
    alias(dfCount$count, "employeeCount"),
    "departmentID"
  )
)

dfOrder = orderBy(
  x          = dfSelect,
  col        = "departmentID",
  decreasing = TRUE
)

display(dfOrder)
import org.apache.spark.sql.functions._
import spark.implicits._

val df = spark.table("main.default.employees")
  .groupBy($"departmentID")
  .agg(count($"departmentID").as("employeeCount"))
  .select($"employeeCount", $"departmentID")
  .orderBy($"employeeCount".desc)

display(df)
USE main.default;

SELECT COUNT(*) AS employeeCount, departmentID
FROM employees
GROUP BY departmentID
ORDER BY employeeCount DESC

See also Query data.

Read from a view of a Delta table in Unity Catalog

from pyspark.sql.functions import col

df = spark.table("main.default.employee_ids_depts")

df = df.groupBy(df.departmentID).count()
df = df.select(col("count").alias("employeeCount"), "departmentID")
df.orderBy(df.departmentID, ascending = False)

display(df)
library(SparkR)

df = tableToDF("main.default.employee_ids_depts")

dfCount = count(groupBy(df, "departmentID"))

dfSelect = select(
  x   = dfCount,
  col = list(
    alias(dfCount$count, "employeeCount"),
    "departmentID"
  )
)

dfOrder = orderBy(
  x          = dfSelect,
  col        = "departmentID",
  decreasing = TRUE
)

display(dfOrder)
import org.apache.spark.sql.functions._
import spark.implicits._

val df = spark.table("main.default.employee_ids_depts")
  .groupBy($"departmentID")
  .agg(count($"departmentID").as("employeeCount"))
  .select($"employeeCount", $"departmentID")
  .orderBy($"employeeCount".desc)

display(df)
USE main.default;

SELECT COUNT(*) AS employeeCount, departmentID
FROM employee_ids_depts
GROUP BY departmentID
ORDER BY employeeCount DESC

See also Query data.

Delete Delta tables and views in Unity Catalog

The following examples show how to delete Delta tables and views from Unity Catalog. These examples assume that within your Databricks workspace’s Unity Catalog metastore that you have access to an existing catalog named main, an existing schema named default within that catalog, and an existing managed table named employees and a view named employee_ids_depts within that schema. To get access, see Data permissions.

Delete a Delta table from Unity Catalog

spark.sql("DROP TABLE main.default.employees")
library(SparkR)

sql("DROP TABLE main.default.employees")
spark.sql("DROP TABLE main.default.employees")
DROP TABLE main.default.employees;

Delete a view of a Delta table in Unity Catalog

spark.sql("DROP VIEW main.default.employee_ids_depts")
library(SparkR)

sql("DROP VIEW main.default.employee_ids_depts")
spark.sql("DROP VIEW main.default.employee_ids_depts")
DROP VIEW main.default.employee_ids_depts;

Example notebook

The following example notebook shows how to use PySpark with Unity Catalog to create and grant access to a catalog, a schema (database), and a managed table. This notebook also shows how to use PySpark with Unity Catalog and the Pandas API on Spark.

Create and manage a Unity Catalog table with Python

Open notebook in new tab

Limitations of Delta Lake for Unity Catalog

Using Delta Lake with Unity Catalog has the following limitations:

  • Shallow clones are not supported when using Unity Catalog as the source or target of the clone.

  • Bucketing is not supported for Unity Catalog tables. Commands trying to create a bucketed table in Unity Catalog will throw an exception.

  • Overwrite mode for DataFrame write operations into Unity Catalog is supported only for Delta tables, not for other file formats. The user must have the CREATE privilege on the parent schema and must be the owner of the existing object.

See Unity Catalog limitations (Public Preview).