TL;DR: unmanaged memory is RAM that the Dask scheduler is not directly aware of and which can cause workers to run out of memory and cause computations to hang and crash.
In this article we:
At any given moment, the Dask scheduler tracks chunks of data scattered over the workers of the cluster, which is typically either the input or output of Dask tasks. The RAM it occupies is called managed memory.
For example, if we write:
//from dask.distributed import Client
client = Client()
future = client.submit(numpy.random.random, 1_000_000)
Now one of the workers is holding 8 MB of managed memory, which will be released as soon as the future will be dereferenced.
Managed memory never fully accounts for all of the process memory, that is the RAM occupied by the worker processes and observed by the OS.
The difference between process memory and managed memory is called unmanaged memory and it is the sum of:
In an ideal situation, all of the above are negligible in size. This is not always the case, however, and unmanaged memory can accumulate to unsustainable levels. This can cause workers to run out of memory and cause whole computations to hang or crash. You may have experienced this yourself. For example, when using Dask you may have seen error reports like the following:
Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory?
Process memory: 61.4GiB -- Worker memory limit: 64 GiB
Since distributed 2021.04.1, the Dask dashboard breaks down the memory usage of each worker and of the cluster total:
In the graph we can see:
Unmanaged recent is the portion of unmanaged memory that has appeared over the last 30 seconds. The idea is that, hopefully, most of it is due to garbage collection lag or tasks heap and should disappear soon. If it doesn’t, it will transition into unmanaged memory. Unmanaged recent is tracked separately so that it can be ignored by heuristics - namely, rebalance() - for increased stability.
By construction, process memory = managed + unmanaged + unmanaged recent.
The same information is available in tabular format on the Workers tab:
In the screenshots above, we see large amounts of unmanaged recent memory - which should hopefully help us identify what event caused it.
If you see a large amount of unmanaged memory, it would be a mistake to blindly assume it is a leak. As listed earlier, there are several other frequent causes:
How much RAM is the execution of your tasks taking? Do large amounts of unmanaged memory appear only when running a task and disappear immediately afterwards? If so, you should consider breaking your data into smaller chunks.
Do you get a substantial reduction in memory usage if you manually trigger the garbage collector?
client.run(gc.collect) # collect garbage on all workers
In CPython (and unlike, namely, pypy and Java) the garbage collector is only involved in the case of circular references. If calling gc.collect() on your workers makes your unmanaged memory drop, you should investigate your data for circular references and/or tweak the gc settings on the workers through a Worker Plugin [https://distributed.dask.org/en/latest/plugins.html#worker-plugins].
Another important cause of unmanaged memory on Linux and MacOSX, which is not widely known about, derives from the fact that the libc malloc()/free() manage a user-space memory pool, so free() won’t necessarily release memory back to the OS. This is particularly likely to affect you if you deal with large amounts of small Python objects - basically, all non-NumPy data as well as small NumPy chunks (which underlie both dask.array and dask.dataframe). What constitutes a “small” NumPy chunk varies by OS, distribution, and possibly the amount of installed RAM; it’s been empirically observed on chunks sized 1 MiB each or smaller.
To see if trimming helps, you can run (Linux workers only):
def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
Here’s an example of how the dashboard can look like before and after running malloc_trim:
If you observe a reduction in unmanaged memory like in the above images, read https://distributed.dask.org/en/latest/worker.html#memtrim for how you can robustly tackle the problem in production.
Starting from distributed 2021.06.0, rebalance()[https://distributed.dask.org/en/latest/api.html#distributed.Client.rebalance] takes unmanaged memory into consideration, so if e.g. you have a memory leak on a single worker, that worker will hold less managed data than its peers after rebalancing. This is something to keep in mind if you are both a rebalance() user and are affected by trimming issues (as described in the previous section).
In the future, we will revise other heuristics that measure worker memory - namely, those underlying scatter() [https://distributed.dask.org/en/latest/locality.html#data-scatter]
and replicate() [https://distributed.dask.org/en/latest/api.html#distributed.Client.replicate] to ensure that they consider unmanaged memory.
Recent developments in Dask made memory usage on the cluster a lot more transparent to the user. Unexplained memory usage is not necessarily caused by a memory leak, and hopefully, this article added some tools to your kit when you need to debug it.
To learn more about Dask, visit the Dask website by clicking below.
Thanks for reading. If you’re interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.