Over the last few weeks, the Coiled team has been experimenting with a new approach to DataFrame shuffling in Dask. It's not ready for release yet, but it does show a promising path forward for significantly improving performance, and we'd love it if you tried it out!
This post goes into a lot of technical detail. If you just want to try out the new implementation, skip to the end for instructions.
As discussed in https://coiled.io/blog/dask-as-a-spark-replacement/, shuffling is a key part of most ETL workloads (namely the "transform" part, where the dataset is split up one way and needs to be reorganized along different lines). merge, set_index, groupby().apply(), and other common operations are all (sometimes) backed by shuffling.
In principle, shuffling is very simple: you look at a row in a DataFrame, figure out which output partition number it belongs to, figure out which worker should hold that partition in the end, and send the data along:
Oddly though, this operation that sounds like a simple for-loop is awkward to express as a task graph like Dask uses. Every output partition depends on every input partition, so the graph becomes N² in size. Even with reasonable amounts of input data, this can crash the Dask scheduler.
And when it does work, the scheduler is still the bottleneck, trying to micromanage the transfer of these many sub-fragments of data. Since there's no heavy number-crunching involved, the limiting factor should be IO: sending data over the network, and to disk if shuffling a larger-than-memory dataset. However, Dask's current implementation isn't efficient enough to reach these hardware limits.
So we tested a rather heretical idea to make Dask's shuffling better: just bypass a lot of Dask. We wrote an extension that attaches to every worker and uses the worker's networking APIs to send data directly to its peers without scheduler intervention. We still used a task graph, letting us lean on the centralized scheduler for some synchronization that's otherwise tricky to handle in a distributed system, but effectively hid the low-level details from the scheduler, keeping the graph small (O(N) instead of O(N²)).
We confirmed two wholly unsurprising things:
This is great to confirm! Because it tells us:
The second won't be a radical change, or even something any but the most advanced end-users would ever do. Rather, it's a practical recognition that Dask's goal is to make scalable parallel computing broadly accessible to Python users, and task graphs are not the only way to do that. Graphs serve Dask very well, but like every tool, they're not perfect for every situation. In select situations that are both very important and very poorly suited to task graphs, we can help users a lot by not clinging too tightly to existing abstractions.
With that philosophizing out of the way, we'll talk about an assortment of interesting challenges we ran into with this new system.
Though the shuffling algorithm is simple in principle, doing it well takes some Goldilocks-like balancing of resources (memory, network, disk, concurrency). Both network and disk perform better in batches, rather than lots of tiny tiny sends/writes. So we added logic to accumulate pieces of the input partitions until we have enough to send to a given worker and to accumulate those received pieces until we have enough to write to disk. Buffering like this while not running out of memory is a balancing act. In our final implementation, we hope to make this both simpler and more self-tuning.
But it turns out the whole thing may have been a premature optimization since network and disk are still far from being the bottleneck.
We profiled the shuffle using py-spy, which gave a detailed view of what was happening from our Python code all the way down to the lowest-level C code within the CPython standard library. This quickly showed we were spending 70% of our time waiting on the GIL (Global Interpreter Lock)!
Since we're writing a multi-threaded program, this is concerning in general—it means we won't be able to get much parallelism out of it. Our worker threads (which split and re-group DataFrames as they're sent or received) call pd.concat a lot, which internally calls np.concatenate, which, it turns out, doesn't release the GIL. So this is one problem: we can't parallelize our CPU work as we'd hoped to.
But the bigger problem is that this is slowing down our networking enormously. We're using non-blocking sockets, so the calls to send and receive are supposed to be nearly instantaneous and—as the name suggests—non-blocking. But because these calls actually have to block on re-acquiring the GIL, they might take 10x longer than they need to, gumming up the asyncio event loop (the cardinal sin of asynchronous programming). In the end, this prevents the next send or receive from running as soon as it should have, and so on, such that we complete these sends and receives far less frequently than we want to.
This is a well-known problem in Python that's been debated since 2010 (bpo-7946) known as the "convoy effect". For an excellent in-depth explanation of this, see this blog post. But the premise is something like this:
There are hundreds of people in a house who are sharing one bathroom. Most of them want to shower (slow). Some of them just want to brush their teeth (fast). But the tooth-brushers are very considerate, and recognize that while they're brushing their teeth, they don't actually need to be in the bathroom—they could step outside, then come back in when they're done to spit in the sink. But what happens is that when they step out, usually a showerer immediately jumps in and hogs the bathroom for a long time while singing show tunes, leaving the tooth-brusher awkwardly waiting around through a rendition of the entire soundtrack to Hamilton. Overall, it means the tooth-brushing takes far longer than it needs to, and if the tooth-brushers had just been a little less considerate, they could have finished much faster and nobody would have complained much.
As much as this sounds bleak (a known issue for over a decade! in CPython! labeled as wontfix!), I'm actually very excited and optimistic about this. We've understood the root problem, and it means we can stop worrying about optimizing other stuff for now, since it won't have much of an effect. And I think there are opportunities for both an "avoid it" solution (work with NumPy devs to release the GIL during concat? avoid pandas and NumPy altogether, and use Arrow/polars/etc. instead?) and a "fix it" solution (don't release the GIL with non-blocking sockets, either in CPython or in an alternative library like uvloop?). And either approach will benefit not just shuffling, but Dask as a whole, and likely the entire Python community.
As a side note, this particularly shows how valuable good profiling tools like py-spy are. If we'd just looked at the Python call stack (say, through Dask's built-in profiler), we might have seen read and write took most of the time and concluded that we were network bound, and the network was just slow. But by being able to look deeper, we discovered that most of that supposed networking time was actually spent doing nothing at all.
Besides the GIL issue, we found concatenating many pandas DataFrames was slower than we expected. In conversations with pandas developers, they've identified a couple of issues that will hopefully make their way into performance improvements in future pandas versions.
Another interesting problem was that Dask workers would run out of memory because they loaded more input DataFrames upfront than they needed to, instead of loading and immediately starting to transfer them to other workers. This turns out to be a rather core consequence of the way distributed scheduling is designed in Dask, and is an issue that's been bothering many Dask users for a while, and has been proposed to fix (but the fix is a complex change).
You can read about the problem in more detail in this issue, but the premise is that the scheduler tells workers about all of the root tasks upfront, but doesn't tell them about downstream tasks until it hears that the corresponding root tasks have completed. In the time gap between a root task completing and the scheduler telling the worker, there's something else to use that root data for, the worker goes off and runs extra root tasks since that's all it knows about.
This is an important issue, and we're excited to solve it. However, for this particular case, we realized there was a simple workaround: the "fusion" step of graph optimization. By fusing the "load the data" root task to its single "transfer the data" dependent task (producing one "load and transfer the data" root task), it didn't matter if workers ran too many of those tasks initially. Thanks to recent work on High-Level Graphs, with a slight tweak to the graph, this fusion step happens automatically and will fuse across most DataFrame operations.
We're glad this workaround was easy, since we expect that delivering effective shuffling can be done a bit faster than solving this underlying issue, and it will help lots of users. But addressing this root task overproduction issue is very high on the priority list after that.
If you've been struggling with large set_index or groupby().apply() operations failing, we'd love to get some early feedback on this proof-of-concept PR. Keep in mind that this is experimental and has some very sharp edges; see the warnings listed at https://github.com/dask/dask/pull/8223. Nonetheless, it does work on straightforward cases, so if it works for you—or it doesn't—we'd love to hear the feedback as comments on that PR (or new issues, if you encounter specific problems).
Here's how you can try out the new shuffle implementation on Coiled.
First, create a local virtual environment and install the versions from git:
//$ conda create -n shuffle -c conda-forge dask coiled jupyterlab dask-labextension
$ conda activate shuffle
(shuffle) $ pip install -U --no-deps git+https://github.com/gjoseph92/dask@shuffle_service git+https://github.com/dask/distributed//]]>
Then, create a Coiled software environment:
Then spin up a cluster using this software environment:
cluster = coiled.Cluster(
client = distributed.Client(cluster)
Now make some example data, and try re-indexing it.
Be sure to open the cluster dashboard at https://cloud.coiled.io to see what's going on.
//df = dask.datasets.timeseries("2000-01-01", "2005-01-01")
dfz = df.assign(z=df.x + df.y)
Try removing the shuffle="service" to see how much of an improvement this makes!
Here's a before-and-after of the current standard shuffle versus this new shuffle implementation. The most obvious difference is memory: workers are running out of memory with the old shuffle, but barely using any with the new. You can also see there are almost 10x fewer tasks with the new shuffle, which greatly relieves pressure on the scheduler.
Thanks for reading! And if you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.