Under the Hood with Dask’s Cluster Autoscaling

tl;dr: Autoscaling can save money and improve performance by “right-sizing” your distributed compute Dask clusters as workloads change. Dask does this by collecting a lot of data about the work you’re running, and scaling to a size that ensures all your pending work gets done quickly (i.e., within a timeframe you can choose).

What is autoscaling?

Autoscaling—called dynamic allocation in some ecosystems such as Apache Spark—is the ability for a distributed compute system to scale up (run on more nodes) and down (run on fewer nodes) over time, with the cluster size adjusted automatically to match the needs of the workload.

Coiled handles the creation and management of Dask clusters in the cloud. It automagically synchronizes software environments between local client and the remote cluster and allows you to enable autoscaling using the coiled.Cluster.adapt() command.

Why autoscale?

Distributed computation always comes with a “tax”: there is additional complexity as compared with local, single-machine computation. Autoscaling provides a “tax break”—a benefit that we get in that distributed world.

Autoscaling is also a selling point for cloud computing:

  • Automatic scale up allows the same application code to use enormous amounts of compute resources when necessary to solve hard problems fast or to support unpredictable user growth
  • Automatic scale down ensures that costs are minimized and utilization of expensive resources is maximized

Thus autoscaling is a key pillar of the cloud value proposition: while cloud resources such as CPU cycles or storage are almost always more expensive than the equivalent “on-prem” resources, in the cloud you have unlimited scale while paying only for what you use, mitigating any cost differential.

An alternative perspective is that—whether in the cloud or on-prem—you may want each application to make conservative use of resources so that other applications within the same cluster fabric can claim those resources.

Basically, if you’ve sized your workload well and it’s consistent over time, you may never need scaling. But if your workload varies—perhaps across customer volume, regions, time periods, or analytics requests—autoscaling may save you time, money, and headaches.

Dask’s autoscaling support and APIs

Dask’s distributed scheduler—the state of the art scheduler recommended for almost all use cases today, and available under the distributed package—includes support for autoscaling, referred to as “adaptive” scaling.

Which clustering systems support autoscaling?

Dask’s distributed scheduler does the hard work (explained below) of figuring out how many nodes to bring online or to retire, as well as which specific nodes to involve and when to do so. However, at the end of the day, the cluster manager—abstracted with the Dask Cluster class—has to be able to acquire and manage more nodes, containers, or processes for the Dask workers.

That means most Cluster implementations—KubeCluster   HelmCluster   YarnCluster   LocalCluster   FargateCluster  and others—support adaptive scaling. Check your Cluster implementation to make sure it supports the adaptive APIs.

Dask adaptive scaling APIs

For manually scaling (resizing) your Dask cluster, the Cluster.scale(...) method adjusts cluster size to whatever number you explicitly pass in. But that makes you do the work when we might really want to lean on Dask’s profiling magic to make that decision for us.

The core API for automatic scaling is Cluster.adapt(...) which allows you to optionally set minimum and maximum values for Dask workers, cores, and memory.

A few other important arguments can be provided—such as how often to check whether the cluster should be resized—and these are documented under the Adaptive class. Some of these settings are also accessible via the Dask config settings under the distributed.adaptive key.

How Dask’s autoscaler (cluster adaptive sizing) works

Ok, now that you know how and why to get started with Dask adaptive scaling, let’s take a deeper dive to see how it works under the hood!

In addition to user-supplied constraints (like min and max workers), the key ingredient is the extensive statistics collected by the Dask scheduler’s dynamic statistical profiler. As discussed in our last “Under the Hood” blog covering Map Joins, Dask’s scheduler collects statistics on runtime and resource usage for every single task that runs on your cluster. 

These data can be aggregated by task type (often the Python function name, or the “human-readable” initial part of the task key—this is also the label that appears throughout the dashboards) to provide estimates of each kind of operation. 

The scheduler also accounts for the resources available (e.g., memory) on the various worker nodes. 

At this point, the scheduler can look at all of the pending work (every single task currently queued for execution) and total up an expectation of required time given the current cluster size.

This “expected time” can then be divided by the configurable target duration (defaulting to 5 seconds) to get the proportional scale up (or down) needed to handle the pending work. 

Before requesting nodes, a couple of adjustments can be made to that target size. If the workload appears memory constrained, and the target size is less than twice the number of current workers, then the target size will increase, and aim to double the current workers. 

Finally the target will be adjusted to respect the configured minimum and maximum values (if any).

Now the cluster is asked to adjust to a new target worker count, and the speed of scaling is limited only by the cluster manager’s ability to allocate new workers.

When scaling down, Dask employs a few extra steps to mitigate oscillations that can occur when a fast system scales down (to avoid yielding a slower system that immediately needs to scale back up). Data resident on scale-down target nodes is first migrated to other nodes (so that it won’t need to be recomputed) and Dask waits for a few autoscale cycles (specified in `wait_count` and defaulting to 3) to add explicit damping.

Even more detail on Dask’s distributed scheduler

You can read (or step through) the scheduler logic here at the Scheduler.adaptive_target method.

Moreover, Adaptive can be subclassed to provide custom heuristics—for example to respond to availability of other custom resources in the cluster like GPUs.

Dask adaptive scaling best practices

Because the autoscaling logic can be quite aggressive at scaling up and down, it’s important to specify min and max constraints. You don’t want to blow through your cloud budget or annoy coworkers sharing your same Kubernetes cluster.

If you have several workloads to run in succession which may benefit from autoscaling, you may hit optimal sizing sooner if you use the same cluster (since the scheduler will retain statistics from previous work) versus killing the cluster and spinning up a new one for each workload. While the scheduler has to run all the time, the scale-down of workers should mitigate the cost of keeping the cluster active.

Last, we recommend tuning and sizing your cluster first on a known workload (data + logic) and only then considering enabling autoscaling. In any case do not use autoscaling as a way to ignore or avoid tuning your worker size, partitions, etc. Designed to solve a different set of problems, using autoscaling in lieu of any tuning will lead to difficulties (and often excess cost and/or poor utilization).

Scalable data science and machine learning with Dask

Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud