Skip to main content

Update jobs when you upgrade legacy workspaces to Unity Catalog

When you upgrade legacy workspaces to Unity Catalog, you might need to update existing jobs to reference upgraded tables and filepaths. The following table lists typical scenarios and suggestions for updating your jobs.

Scenario

Solution

Job is using a notebook that has references to custom libraries either through an init script, or cluster-defined libraries.

A custom library would be defined as a non-publicly available pip package or jar that performs Apache Spark or SQL read or write operations embedded in its code.

Modify the custom library to ensure that:

  • Database names are three-level namespaced.
  • Mount points are not used in code.

Job is using a notebook that is reading from or writing to a Hive metastore table.

  • Evaluate setting the default catalog in the job cluster Spark config: spark.databricks.sql.initial.catalog.name my_catalog
  • Evaluate if the workspace default catalog can be set to other than hive_metastore so the job code does not have to be changed.
  • Otherwise change job code to rename two-level namespaces to three-level namespaces of the appropriate table.
  • If the job is using pure SQL, consider adding a USE CATALOG statement.

Job is using a notebook that is reading from or writing to paths that are subfolders of tables. This is not possible in Unity Catalog.

  • Change code to read from the table with a predicate on the partition column.
  • Change code to write to the table with overwriteByPartition or other appropriate option.

Job is using a notebook that is reading from or writing to mount paths that are tables registered in Unity Catalog

  • Change code to refer to the correct three-level namespaced table.
  • If the table is not registered, or will not be, the code still needs to be modified to write to a volume path instead of a mount path.

Job is using a notebook that reads or writes files, not tables, via mount paths.

Change code to write to a volume location instead.

Job is a streaming job that uses applyInPandasWithState.

Not currently supported. Consider rewriting if possible, or do not attempt to refactor this job until support is provided.

Job is a streaming job that uses continuous processing mode.

Continuous processing mode is experimental in Spark and is not supported in Unity Catalog. Refactor the job to use structured streaming. If this is not possible, consider keeping the job running against Hive metastore.

Job is a streaming job that uses checkpoint directories.

  • Move checkpoint directories to volumes.
  • Change code in the notebook to use a volume path.
  • Job owner should have read-write on that path.
  • Stop job.
  • Move checkpoint to the new volume location.
  • Restart job.

Job has a cluster definition below Databricks Runtime 11.3.

  • Change job cluster definition to Databricks Runtime 11.3 or above.
  • Change job cluster definition to use a designated or standard access mode.

Job has notebooks that interact with storage or tables.

The service principal that the job was running as must be provided with read and write access to requisite resources in Unity Catalog, such as volumes, tables, external locations, and so forth.

Job is a Lakeflow Declarative Pipelines.

  • Change the job cluster to Databricks Runtime 13.1 or above.
  • Stop the Lakeflow Declarative Pipelines job.
  • Move the data has to a Unity Catalog managed table.
  • Change the Lakeflow Declarative Pipelines job definition to use the new Unity Catalog managed table.
  • Restart the Lakeflow Declarative Pipelines job.

Job has notebooks that use non-storage cloud services, such as AWSKinesis, and the configuration used to connect uses an instance profile.

  • Modify the code to use Unity Catalog service credentials, which govern the credentials capable of interacting with non-storage cloud services by generating temporary credentials usable by the SDKs.

Job uses Scala

  • If below Databricks Runtime 13.3, run on dedicated compute.
  • Standard clusters are supported on Databricks Runtime 13.3 and above.

Job has notebooks which use Scala UDFs

  • If below Databricks Runtime 13.3, run on dedicated compute.
  • Standard clusters are supported on Databricks Runtime 14.2.

Job has tasks which use MLR

Run on dedicated compute.

Job has cluster configuration that relies on global init scripts.

  • Use Databricks Runtime 13.3 or above for full support.
  • Modify to use cluster-scoped init scripts or use cluster policies. Scripts, files, and packages, must be installed on Unity Catalog volumes in order to run.

Job has cluster configuration or notebooks that use jars/Maven, Spark extensions, or custom data sources (from Spark).

  • Use Databricks Runtime 13.3 or above.
  • Use cluster policies to install libraries.

Job has notebooks which use PySpark UDFs.

Use Databricks Runtime 13.2 or above.

Job has notebooks that have python code that makes network calls.

Use Databricks Runtime 12.2 or above.

Job has notebooks that use Pandas UDFs (scalar).

Use Databricks Runtime 13.2 or above.

Job will use Unity Catalog Volumes.

Use Databricks Runtime 13.3 or above.

Job has notebooks that use spark.catalog.X (tableExists, listTables, setDefaultCatalog) and runs using a shared cluster

  • Use Databricks Runtime 14.2 or above.

  • If Databricks Runtime upgrade is not possible then use the following steps:

    Instead of tableExists, use the following code:

    # SQL workaround
    def tableExistsSql(tablename):
    try:
    spark.sql(f"DESCRIBE TABLE {tablename};")
    except Exception as e:
    return False
    return True
    tableExistsSql("jakob.jakob.my_table")

    Instead of listTables, use SHOW TABLES (allows to restrict database or pattern matching as well):

    spark.sql("SHOW TABLES")

    For setDefaultCatalog run

    spark.sql("USE CATALOG ")

Job has notebooks that uses Internal DButils API: Command Context and runs using a shared cluster.

Workloads that try to access the command access, for example to retrieve a job id, using

dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()

Instead of .toJson(), use .safeToJson(). This gives a subset of all command context information that can be securely shared on a shared cluster.

Requires Databricks Runtime 13.3 LTS+

Job has notebooks that uses PySpark: spark.udf.registerJavaFunction and runs using a shared cluster

  • Use Databricks Runtime 14.3 LTS or above
  • For notebooks and jobs, use a %scala cell to register the Scala UDF using spark.udf.register. As Python and Scala share the execution context, the Scala UDF will be available from Python as well.
  • For customers using IDEs (using Databricks Connect v2), the only option is to rewrite the UDF as a Unity Catalog Python UDF. In the future, we plan to extend support for Unity Catalog UDFs to support Scala.

Job has notebooks that uses RDDs: sc.parallelize & spark.read.json() to convert a JSON object into a DF and runs using a shared cluster

  • Use json.loads instead

Example -

Before:

json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)

After:

from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()

Job has notebooks that uses RDDs: Empty Dataframe via sc.emptyRDD() and runs using a shared cluster

Example -

Before:

val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)

After:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)

Job has notebooks that uses RDDs: mapPartitions (expensive initialization logic + cheaper operations per row) and runs using a shared cluster

  • Reason -

    Unity Catalog shared clusters using Spark Connect for the communication between the Python / Scala programs and the Spark Server, making RDDs no longer accessible.

    Before:

    A typical use case for RDDs is to execute expensive initialization logic only once and then perform cheaper operations per row. Such a use case can be calling an external service or initializing encryption logic.

    After:

    Rewrite RDD operations using Dataframe API and using PySpark native Arrow UDFs.

Job has notebooks that uses SparkContext (sc) & sqlContext and runs using a shared cluster

  • Reason -

    Spark Context (sc) & sqlContext are not available by design due to Unity Catalog shared cluster architecture and SparkConnect.

    How to Solve:

    Use spark variable to interact with the SparkSession instance

    Limitations:

    The Spark JVM cannot be accessed directly from the Python / Scala REPL, only via Spark commands. This means that sc._jvm commands will fail by design.

    The following sc commands are not supported: emptyRDD, range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFile, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf

Job has notebooks that uses Spark Conf - sparkContext.getConf and runs using a shared cluster

  • Reason -

    sparkContext, df.sparkContext, sc.sparkContext and similar APIs are not available by design.

    How to Solve:

    Use spark.conf instead

Job has notebooks that uses SparkContext - SetJobDescription() and runs using a shared cluster

  • Reason -

    sc.setJobDescription("String") are not available by design due to Unity Catalog shared cluster architecture and SparkConnect.

    How to Solve:

    Use tags instead if possible [PySpark docs]

    spark.addTag() can attach a tag, and getTags() and interruptTag(tag) can be used to act upon the presence/absence of a tag

    Requires Databricks Runtime 14.1+

Job has notebooks that sets Spark Log Levels using commands, such as sc.setLogLevel("INFO"), and runs using a shared cluster

  • Reason -

    In Single-User and no isolation clusters it is possible to access the Spark Context to dynamically set the log level across drivers and executors directly. On shared clusters, this method was not accessible from the Spark Context, and in Databricks Runtime 14+ the Spark Context is no longer available.

    How to Solve:

    To control the log level without providing a log4j.conf, it is now possible to use a Spark configuration value in the cluster settings. Use Spark Log Levels by setting spark.log.level to DEBUG, WARN, INFO, ERROR as a Spark configuration value in the cluster settings.

Job has notebooks that uses deeply nested expressions / queries and runs using a shared cluster

  • Reason -

    RecursionError / Protobuf maximum nesting level exceeded (for deeply nested expressions / queries)

    When recursively creating deeply nested DataFrames and expressions using the PySpark DataFrame API, it is possible that in certain cases either one of the following might occur:

    • Python exception: RecursionError: maximum recursion depth exceeded
    • SparkConnectGprcException: Protobuf maximum nesting level exceeded

    How to Solve:

    To circumvent the problem identify deeply nested code-paths and rewrite them using linear expressions / subqueries or temporary views.

    For example: Instead of recursively calling df.withColumn, call df.withColumns(dict) instead.

Job has notebooks that uses input_file_name() in the code and runs using a shared cluster

  • Reason -

    input_file_name() is not supported in Unity Catalog for shared clusters.

    How to Solve:

    To get the file name

    .withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))

    will work for spark.read

    To get the whole file path

    .withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))

    will work for spark.read

Job has notebooks that performs data operations on DBFS filesystems and runs using a shared cluster

  • Reason -

    When using DBFS with shared cluster using FUSE service, it cannot reach the filesystem and generates file not found error

    Example:

    Here are some examples when using shared cluster access to DBFS fails

    with open('/dbfs/test/sample_file.csv', 'r') as file:
    ls -ltr /dbfs/test
    cat /dbfs/test/sample_file.csv

    How to Solve:

    Either Use -

    • Databricks Unity Catalog Volume instead of using DBFS (Preferred)
    • Update the code to use dbutils or spark that goes through the direct to storage access path and is granted access to DBFS from shared cluster