This post shows you how to compute the memory usage of a Dask DataFrame and how to develop a partitioning strategy based on the distribution of your data.

Dask distributes data in DataFrame partitions, so computations can be run in parallel.  Each partition in a Dask DataFrame is a pandas DataFrame.  This post explains how to measure the amount of data in each Dask partition.  Intelligently distributing the data across partitions is important for performance.

There aren’t hard-and-fast rules on optimal partition sizes.  It depends on the computing power of the nodes in your cluster and the analysis you’re running.

A general rule of thumb is to target 100 MB of data per memory partition in a cluster.  This post shows you how to measure the distribution of data in your cluster so you know when and if you need to repartition.

Here’s what you’ll learn in this post:

1. Calculating memory usage of a small Dask DataFrame
2. Memory usage of a large Dask DataFrame
3. Filtering can cause partition imbalances
4. Assessing when a Dask DataFrame’s memory usage is unevenly distributed
5. Fixing imbalances with repartitioning
6. Other ways to compute memory usage by partition

## Memory usage of small Dask DataFrames

Create a small Dask DataFrame with two partitions.

//import pandas as pd
from dask import dataframe as dd

df = pd.DataFrame(
{"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf = dd.from_pandas(df, npartitions=2)//]]>

Print the data in each of the partitions.

//for i in range(ddf.npartitions):
print(ddf.partitions[i].compute())

nums letters
0     1       a
1     2       b
2     3       c
nums letters
3     4       d
4     5       e
5     6       f//]]>

Use the pandas memory_usage method to print the bytes of memory used in each column of the first partition.

//ddf.partitions.memory_usage(deep=True).compute()

Index      128
letters    174
nums        24
dtype: int64//]]>

Print the total memory used by each partition in the DataFrame with the Dask memory_usage_per_partition method.

//ddf.memory_usage_per_partition(deep=True).compute()

0    326
1    330
dtype: int64//]]>

Both of these partitions are tiny because the entire DataFrame only contains six rows of data.

If deep is set to False then the memory usage of the object columns is not counted.

//ddf.memory_usage_per_partition(deep=False).compute()

0    176
1    180
dtype: int64//]]>

Calculating the memory usage of object columns is slow, so you can set deep to False and make the computation run faster.  We care about how much memory all the columns are using, so our examples use deep=True.

## Memory usage of large Dask DataFrame

Let’s calculate the memory for each partition of a 662 million-row dataset.

"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True}
)

ddf.memory_usage_per_partition(deep=True).compute()

0       57061027
1       57060857
2       57059768
3       57059342
4       57060737
...
1090    57059834
1091    57061111
1092    57061001
1093    57058404
1094    57061989
Length: 1095, dtype: int64//]]>

The DataFrame has 1,095 partitions and each partition has 57 MB of data.

The data is evenly balanced across each partition in the DataFrame.  There aren’t lots of tiny, empty, or huge partitions.  You probably don’t need to repartition this DataFrame because all the memory partitions are reasonably sized and the data is evenly distributed.

Let’s look at an operation that can cause this DataFrame to have data that’s imbalanced across partitions.

## Filtering can cause imbalances

Dask DataFrames often become unbalanced after a large filtering operation, as discussed in the Filtering DataFrames post.

This section demonstrates how a large filtering operation can cause a DataFrame to have partitions that are way smaller than optimal.

Let’s run a filtering operation and then examine the memory per partition:

//filtered_ddf = ddf.loc[ddf["id"] > 1150]

filtered_ddf.memory_usage_per_partition(deep=True).compute()

0         0
1        94
2         0
3         0
4       187
...
1090      0
1091    189
1092      0
1093      0
1094      0
Length: 1095, dtype: int64//]]>

Many partitions in filtered_ddf are empty and the rest are tiny, way smaller than they should be.

filtered_ddf should be repartitioned to get rid of all the tiny partitions.

## Assessing imbalance in Dask DataFrame partitions

Here’s a helper method to help you measure data imbalance across partitions in a DataFrame.

//import numpy

def partition_report(ddf):
series = ddf.memory_usage_per_partition(deep=True).compute()
total = series.count()
print(f"Total number of partitions: {total}")
total_memory = format_bytes(series.sum())
print(f"Total DataFrame memory: {total_memory}")
total = total.astype(numpy.float64)
lt_1kb = series.where(lambda x : x < 1000).count()
lt_1kb_percentage = '{:.1%}'.format(lt_1kb/total)
lt_1mb = series.where(lambda x : x < 1000000).count()
lt_1mb_percentage = '{:.1%}'.format(lt_1mb/total)
gt_1gb = series.where(lambda x : x > 1000000000).count()
gt_1gb_percentage = '{:.1%}'.format(gt_1gb/total)
print(f"Num partitions < 1 KB: {lt_1kb} ({lt_1kb_percentage})")
print(f"Num partitions < 1 MB: {lt_1mb} ({lt_1mb_percentage})")
print(f"Num partitions > 1 GB: {gt_1gb} ({gt_1gb_percentage})")//]]>

Let’s run partition_report on filtered_ddf and see the partition sizes.

//partition_report(filtered_ddf)

Total number of partitions: 1095
Total DataFrame memory: 101.71 kiB
Num partitions < 1 KB: 1095 (100.0%)
Num partitions < 1 MB: 1095 (100.0%)
Num partitions > 1 GB: 0 (0.0%)//]]>

We can see that all the partitions are less than 1 KB.  There is only 102 KB of data in the entire DataFrame.  This filtered dataset can easily be repartitioned to a small Dask DataFrame or converted to a pandas DataFrame.

## Fixing imbalances with repartitioning

You can repartition your DataFrame when the data is distributed unevenly. See the blog post on repartitioning DataFrames for more information.  The repartitioning blog post also quantifies the performance drag of having the data suboptimally partitioned.

Repartitioning is computationally expensive and should not always be performed.  The performance drag from a small amount of imbalance can be less than the time it takes to fully repartition a dataset.  The optimal strategy depends on your specific dataset, computations, and cluster size.

## Other approaches: sizeof and memory_usage

Let’s look at two approaches for calculating the memory for each partition of a 662 million row dataset.  You don’t need to know either of these, but they’re fun for language API aficionados to know about.

"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True}
)

ddf.map_partitions(lambda x: x.memory_usage(deep=True).sum()).compute()

0       57061027
1       57060857
2       57059768
3       57059342
4       57060737
...
1090    57059834
1091    57061111
1092    57061001
1093    57058404
1094    57061989
Length: 1095, dtype: int64//]]>

This computation takes 124 seconds on a 5 node cluster.

Dask has a sizeof function that estimates the size of each partition and runs faster.

0       56822960
1       57125360
2       56822960
3       57246320
4       57306800
...
1090    56974160
1091    57004400
1092    57337040
1093    56822960
1094    57004400
Length: 1095, dtype: int64//]]>

This takes 92 seconds to run, which is 21% faster than on the same dataset.

The sizeof results are an approximation, but they’re pretty close as you can see.

## Conclusion

This blog post taught you how to measure the memory usage in your DataFrame with memory_usage_per_partition, memory_usage and sizeof.

Dask makes it easy for you to compute how data is stored in different partitions.  This is not as easy with other distributed compute engines.

Make sure to play around with the memory sizes and the number of partitions that work best for your query patterns.  100 MB partitions generally work well, but you might need to make tweaks based on your query patterns and instance types.

Thanks for reading. If you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.