Best practices for performance efficiency

This article covers best practices for performance efficiency, organized by architectural principles listed in the following sections.

Vertical scaling, horizontal scaling, and linear scalability

Before discussing the best practices, let’s first look at a few concepts around distributed computing: horizontal and vertical scaling, and linear scalability:

  • Vertical scaling by adding or removing resources from a single computer, typically CPUs, memory, or GPUs. Usually, this means stopping the workload, moving it to a bigger machine, and restarting it again. Vertical scaling has limits: There might not be a bigger machine, or the price for the next bigger machine is prohibitively high.

  • Horizontal scaling by adding or removing nodes from a distributed system: When the limits of vertical scaling are reached, scaling horizontally is the solution: Distributed computing uses systems with several machines (called clusters) to run the workloads. It is essential to understand that for this to be possible, the workloads must be prepared for parallel execution, as supported by the engines of the Databricks lakehouse, Apache Spark, and Photon. This allows combining multiple reasonably priced machines into a larger computing system. If one needs more compute resources, then horizontal scaling adds more nodes to the cluster and removes them when no longer needed. While technically there is no limit (and the Spark engine will take over the complex part of distributing the loads), large numbers of nodes do increase the management complexity.

  • Linear scalability, meaning that when you add more resources to a system, the relationship between throughput and used resources is linear. This is only possible if the parallel tasks are independent. If not, intermediate results on one set of nodes will be needed on another set of nodes in the cluster for further computation. This data exchange between nodes involves transporting the results over the network from one set of nodes to another set of nodes, which takes considerable time. In general, distributed computing will always have some overhead for managing the distribution and exchange of data. As a result, small data set workloads that can be analyzed on a single node may be even slower when run on a distributed system. The Databricks Data Intelligence Platform provides flexible computing (single node and distributed) to meet the unique needs of your workloads.

Use serverless architectures

Use serverless compute

With the serverless compute on the Databricks Data Intelligence Platform, the compute layer runs in the customer’s Databricks account. Workspace admins can create serverless SQL warehouses that enable instant compute and are managed by Databricks. A serverless SQL warehouse uses compute clusters hosted in the Databricks customer account. Use them with Databricks SQL queries just like you usually would with the original Databricks SQL warehouses. Serverless compute comes with a very fast starting time for SQL warehouses (10s and below), and the infrastructure is managed by Databricks.

This leads to improved productivity:

  • Cloud administrators no longer have to manage complex cloud environments, for example by adjusting quotas, creating and maintaining networking assets, and joining billing sources.

  • Users benefit from near-zero waiting times for cluster start and improved concurrency on their queries.

  • Cloud administrators can refocus their time on higher-value projects instead of managing low-level cloud components.

Design workloads for performance

Understand your data ingestion and access patterns

From a performance perspective, data access patterns - such as “aggregations versus point access” or “scan versus search” - behave differently depending on the data size. Large files are more efficient for scan queries and smaller files better for search since you have to read fewer data to find the specific row(s).

For ingestion patterns, it’s common to use DML statements. DML statements are most performant when the data is clustered, and you can simply isolate the section of data. Keeping the data clustered and isolatable on ingestion is important: Consider keeping a natural time sort order and apply as many filters as possible to the ingest target table. For append-only and overwrite ingestion workloads, there isn’t much to consider, as this is a relatively cheap operation.

The ingestion and access patterns often point to an obvious data layout and clustering. If they do not, decide what is more important to your business and skew toward how to solve that goal better.

Use parallel computation where it is beneficial

Time to value is an important dimension when working with data. While many use cases can be easily implemented on a single machine (small data, few and simple computation steps), often use cases come up that:

  • Need to process large data sets.

  • Have long running times due to complicated algorithms.

  • Must be repeated 100s and 1000s of times.

The cluster environment of the Databricks platform is a great environment to distribute these workloads efficiently. It automatically parallelizes SQL queries across all nodes of a cluster and it provides libraries for Python and Scala to do the same. Under the hood, the engines Apache Spark and Photon analyze the queries, determine the optimal way of parallel execution, and manage the distributed execution in a resilient way.

In the same way as batch tasks, Structured Streaming distributes streaming jobs across the cluster for best performance.

One of the easiest way to use parallel computing are Delta Live Tables. You declare tasks and dependencies of a job in SQL or Python, and then Delta Live Tables takes over the execution planning, efficient infrastructure setup, job execution, and monitoring.

For data scientists, pandas is a Python package that provides easy-to-use data structures and data analysis tools for the Python programming language. However, Pandas does not scale out to big data. Pandas API on Spark fills this gap by providing pandas equivalent APIs that work on Apache Spark.

Additionally, the platform comes with parallelized algorithms for machine learning called MLlib. It supports out-of-the-box leveraging multi-GPU and distributed deep learning compute, such as by Horovod Runner. See HorovodRunner: distributed deep learning with Horovod. Specific libraries also coming with the platform help distribute massively repeated tasks to all cluster nodes, cutting time to value down in a near-linear fashion. For example, Hyperopt for parallel hyperparameter optimization in ML.

Analyze the whole chain of execution

Most pipelines or consumption patterns use a chain of systems. For example, for BI tools the performance is impacted by several factors:

  • The BI tool itself.

  • The connector that connects the BI tool and the SQL engine.

  • The SQL engine where the BI tool sends the query.

For best-in-class performance, the whole chain needs to be taken into account and selected/tuned for best performance.

Prefer larger clusters

Note

Serverless compute manages clusters automatically, so this is not needed for serverless compute.

Plan for larger clusters, especially when the workload scales linearly. In that case, it is not more expensive to use a large cluster for a workload than to use a smaller one. It’s just faster. The key is that you’re renting the cluster for the length of the workload. So, if you spin up two worker clusters and it takes an hour, you’re paying for those workers for the full hour. Similarly, if you spin up a four-worker cluster and it takes only half an hour (here comes the linear scalability into play), the costs are the same. If costs are the primary driver with a very flexible SLA, an autoscaling cluster is almost always going to be the cheapest but not necessarily the fastest.

Use native Spark operations

User Defined Functions (UDFs) are a great way to extend the functionality of Spark SQL. However, don’t use Python or Scala UDFs if a native function exists:

Reasons:

  • To transfer data between Python and Spark, serialization is needed. This drastically slows down queries.

  • Higher efforts for implementing and testing functionality already existing in the platform.

If native functions are missing and should be implemented as Python UDFs, use Pandas UDFs. Apache Arrow ensures data moves efficiently back and forth between Spark and Python.

Use Photon

Photon is the engine on Databricks that provides fast query performance at low cost – from data ingestion, ETL, streaming, data science, and interactive queries – directly on your data lake. Photon is compatible with Apache Spark APIs, so getting started is as easy as turning it on – no code changes and no lock-in.

Photon is part of a high-performance runtime that runs your existing SQL and DataFrame API calls faster and reduces your total cost per workload. Photon is used by default in Databricks SQL warehouses.

Understand your hardware and workload type

Note

Serverless compute manages clusters automatically, so this is not needed for serverless compute.

Not all cloud VMs are created equally. The different families of machines offered by cloud providers are all different enough to matter. There are obvious differences - RAM and cores - and more subtle differences - processor type and generation, network bandwidth guarantees, and local high-speed storage versus local disk versus remote disk. There are also differences in the “spot” markets. These should be understood before deciding on the best VM type for your workload.

Use caching

There are two types of caching available in Databricks: disk caching and Spark caching. Here are the characteristics of each type:

  • Use disk cache

    The disk cache (formerly known as “Delta cache”) stores copies of remote data on the local disks (for example, SSD) of the virtual machines. It can improve the performance of a wide range of queries but cannot be used to store the results of arbitrary subqueries. The disk cache automatically detects when data files are created or deleted and updates its content accordingly. The recommended (and easiest) way to use disk caching is to choose a worker type with SSD volumes when you configure your cluster. Such workers are enabled and configured for disk caching.

  • Avoid Spark Caching

    The Spark cache (by using .persist() and .unpersist()) can store the result of any subquery data and data stored in formats other than Parquet (such as CSV, JSON, and ORC). However, if used at the wrong locations in a query, it might eat up all memory and can even slow down queries substantially. As a rule of thumb, avoid Spark caching unless you know exactly the impact. See Spark caching.

  • Query Result Cache

    Per cluster caching of query results for all queries through SQL warehouses. To benefit from query result caching, focus on deterministic queries that for example, don’t use predicates like = NOW(). When a query is deterministic, and the underlying data is in Delta format and unchanged, SQL Warehouses will return the result directly from the query result cache.

  • Databricks SQL UI caching

    Per user caching of all query and legacy dashboard results in the Databricks SQL UI.

  • Prewarm clusters

    Note

    Serverless compute manages clusters automatically, so this is not needed for serverless compute.

    Independent of query and data format, the first query on a cluster will always be slower than subsequent queries. This has to do with all the different subsystems that will be started and read all the data they need. Take this into account for performance benchmarking. It is also possible to attach a cluster to a ready-to-use pool. See Pool configuration reference.

Use compaction

Delta Lake on Databricks can improve the speed of reading queries from a table. One way to improve this speed is to coalesce small files into larger ones. You trigger compaction by running the OPTIMIZE command. See Compact data files with optimize on Delta Lake.

You can also compact small files automatically using Auto Optimize. See Consider file size tuning.

Use data skipping

Data skipping: To achieve this, data skipping information is collected automatically when you write data into a Delta table (by default Delta Lake on Databricks collects statistics on the first 32 columns defined in your table schema). Delta Lake on Databricks takes advantage of this information (minimum and maximum values) at query time to provide faster queries. See Data skipping for Delta Lake.

For best results, apply Z-ordering, a technique to collocate related information in the same set of files. This co-locality is automatically used on Databricks by Delta Lake data-skipping algorithms. This behavior dramatically reduces the amount of data Delta Lake on Databricks needs to read.

Dynamic file pruning: Dynamic file pruning (DFP) can significantly improve the performance of many queries on Delta tables. DFP is especially efficient for non-partitioned tables or joins on non-partitioned columns.

Avoid over-partitioning

In the past, partitioning was the most common way to skip data. However, partitioning is static and manifests as a file system hierarchy. There is no easy way to change partitions if the access patterns change over time. Often, partitioning leads to over-partitioning - in other words, too many partitions with too small files, which results in bad query performance. See Partitions.

In the meantime, a much better choice than partitioning is Z-ordering.

Consider file size tuning

The term auto optimize is sometimes used to describe functionality controlled by the settings delta.autoCompact and delta.optimizeWrite. This term has been retired in favor of describing each setting individually. See Configure Delta Lake to control data file size.

Auto Optimize is particularly useful in the following scenarios:

  • Streaming use cases where latency in the order of minutes is acceptable.

  • MERGE INTO is the preferred method of writing into Delta Lake.

  • CREATE TABLE AS SELECT or INSERT INTO are commonly used operations.

Optimize join performance

  • Consider range join optimization. See Range join optimization.

    A range join occurs when two relations are joined using a point in interval or interval overlap condition. The range join optimization support in Databricks Runtime can bring orders of magnitude improvement in query performance but requires careful manual tuning.

  • Consider skew join optimization.

    Data skew is a condition in which a table’s data is unevenly distributed among partitions in the cluster. Data skew can severely downgrade the performance of queries, especially those with joins. Joins between big tables require shuffling data, and the skew can lead to an extreme imbalance of work in the cluster. It’s likely that data skew is affecting a query if a query appears to be stuck finishing very few tasks. To ameliorate skew, Delta Lake on Databricks SQL accepts skew hints in queries. With the information from a skew hint, Databricks Runtime can construct a better query plan that does not suffer from data skew. There are two options:

Run analyze table to collect table statistics

Run analyze table to collect statistics on the entire table for the query planner. See ANALYZE TABLE.

ANALYZE TABLE mytable COMPUTE STATISTICS FOR ALL COLUMNS;

This information is persisted in the metastore and helps the query optimizer by:

  • Choosing the proper join type.

  • Selecting the correct build side in a hash-join.

  • Calibrating the join order in a multi-way join.

It should be run alongside OPTIMIZE on a daily basis and is recommended on tables < 5TB. The only caveat is that analyze table is not incremental.

Run performance testing in the scope of development

Test on data representative of production data

Run performance testing on production data (read-only) or similar data. When using similar data, characteristics like volume, file layout, and data skews should be like production data, since this has a significant impact on performance.

Take prewarming of resources into account

The first query on a new cluster is slower than all the others:

  • In general, cluster resources need to initialize on multiple layers.

  • When caching is part of the setup, the first run ensures that the data is in the cache, which speeds up subsequent jobs.

Prewarming resources - running specific queries for the sake of initializing resources and filling caches (for example, after a cluster restart) - can significantly increase the performance of the first queries. So, to understand the behavior for the different scenarios, test the performance of the first execution (with and without prewarming) and subsequent executions.

Tip

Interactive workloads like dashboard refreshes can significantly benefit from prewarming. However, this does not apply to job clusters, where the load by design is executed only once.

Identify bottlenecks

Bottlenecks are areas in your workload that might worsen the overall performance when the load in production increases. Identifying these at design time and testing against higher workloads will help to keep the workloads stable in production.