This post shows you how to compute the memory usage of a Dask DataFrame and how to develop a partitioning strategy based on the distribution of your data.
Dask distributes data in DataFrame partitions, so computations can be run in parallel. Each partition in a Dask DataFrame is a pandas DataFrame. This post explains how to measure the amount of data in each Dask partition. Intelligently distributing the data across partitions is important for performance.
There aren’t hard-and-fast rules on optimal partition sizes. It depends on the computing power of the nodes in your cluster and the analysis you’re running.
A general rule of thumb is to target 100 MB of data per memory partition in a cluster. This post shows you how to measure the distribution of data in your cluster so you know when and if you need to repartition.
Here’s what you’ll learn in this post:
Create a small Dask DataFrame with two partitions.
//import pandas as pd
from dask import dataframe as dd
df = pd.DataFrame(
{"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf = dd.from_pandas(df, npartitions=2)//]]>
Print the data in each of the partitions.
//for i in range(ddf.npartitions):
print(ddf.partitions[i].compute())
nums letters
0 1 a
1 2 b
2 3 c
nums letters
3 4 d
4 5 e
5 6 f//]]>
Use the pandas memory_usage method to print the bytes of memory used in each column of the first partition.
//ddf.partitions[0].memory_usage(deep=True).compute()
Index 128
letters 174
nums 24
dtype: int64//]]>
Print the total memory used by each partition in the DataFrame with the Dask memory_usage_per_partition method.
//ddf.memory_usage_per_partition(deep=True).compute()
0 326
1 330
dtype: int64//]]>
Both of these partitions are tiny because the entire DataFrame only contains six rows of data.
If deep is set to False then the memory usage of the object columns is not counted.
//ddf.memory_usage_per_partition(deep=False).compute()
0 176
1 180
dtype: int64//]]>
Calculating the memory usage of object columns is slow, so you can set deep to False and make the computation run faster. We care about how much memory all the columns are using, so our examples use deep=True.
Let’s calculate the memory for each partition of a 662 million-row dataset.
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True}
)
ddf.memory_usage_per_partition(deep=True).compute()
0 57061027
1 57060857
2 57059768
3 57059342
4 57060737
...
1090 57059834
1091 57061111
1092 57061001
1093 57058404
1094 57061989
Length: 1095, dtype: int64//]]>
The DataFrame has 1,095 partitions and each partition has 57 MB of data.
The data is evenly balanced across each partition in the DataFrame. There aren’t lots of tiny, empty, or huge partitions. You probably don’t need to repartition this DataFrame because all the memory partitions are reasonably sized and the data is evenly distributed.
Let’s look at an operation that can cause this DataFrame to have data that’s imbalanced across partitions.
Dask DataFrames often become unbalanced after a large filtering operation, as discussed in the Filtering DataFrames post.
This section demonstrates how a large filtering operation can cause a DataFrame to have partitions that are way smaller than optimal.
Let’s run a filtering operation and then examine the memory per partition:
//filtered_ddf = ddf.loc[ddf["id"] > 1150]
filtered_ddf.memory_usage_per_partition(deep=True).compute()
0 0
1 94
2 0
3 0
4 187
...
1090 0
1091 189
1092 0
1093 0
1094 0
Length: 1095, dtype: int64//]]>
Many partitions in filtered_ddf are empty and the rest are tiny, way smaller than they should be.
filtered_ddf should be repartitioned to get rid of all the tiny partitions.
Here’s a helper method to help you measure data imbalance across partitions in a DataFrame.
//import numpy
def partition_report(ddf):
series = ddf.memory_usage_per_partition(deep=True).compute()
total = series.count()
print(f"Total number of partitions: {total}")
total_memory = format_bytes(series.sum())
print(f"Total DataFrame memory: {total_memory}")
total = total.astype(numpy.float64)
lt_1kb = series.where(lambda x : x < 1000).count()
lt_1kb_percentage = '{:.1%}'.format(lt_1kb/total)
lt_1mb = series.where(lambda x : x < 1000000).count()
lt_1mb_percentage = '{:.1%}'.format(lt_1mb/total)
gt_1gb = series.where(lambda x : x > 1000000000).count()
gt_1gb_percentage = '{:.1%}'.format(gt_1gb/total)
print(f"Num partitions < 1 KB: {lt_1kb} ({lt_1kb_percentage})")
print(f"Num partitions < 1 MB: {lt_1mb} ({lt_1mb_percentage})")
print(f"Num partitions > 1 GB: {gt_1gb} ({gt_1gb_percentage})")//]]>
Let’s run partition_report on filtered_ddf and see the partition sizes.
//partition_report(filtered_ddf)
Total number of partitions: 1095
Total DataFrame memory: 101.71 kiB
Num partitions < 1 KB: 1095 (100.0%)
Num partitions < 1 MB: 1095 (100.0%)
Num partitions > 1 GB: 0 (0.0%)//]]>
We can see that all the partitions are less than 1 KB. There is only 102 KB of data in the entire DataFrame. This filtered dataset can easily be repartitioned to a small Dask DataFrame or converted to a pandas DataFrame.
You can repartition your DataFrame when the data is distributed unevenly. See the blog post on repartitioning DataFrames for more information. The repartitioning blog post also quantifies the performance drag of having the data suboptimally partitioned.
Repartitioning is computationally expensive and should not always be performed. The performance drag from a small amount of imbalance can be less than the time it takes to fully repartition a dataset. The optimal strategy depends on your specific dataset, computations, and cluster size.
Let’s look at two approaches for calculating the memory for each partition of a 662 million row dataset. You don’t need to know either of these, but they’re fun for language API aficionados to know about.
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True}
)
ddf.map_partitions(lambda x: x.memory_usage(deep=True).sum()).compute()
0 57061027
1 57060857
2 57059768
3 57059342
4 57060737
...
1090 57059834
1091 57061111
1092 57061001
1093 57058404
1094 57061989
Length: 1095, dtype: int64//]]>
This computation takes 124 seconds on a 5 node cluster.
Dask has a sizeof function that estimates the size of each partition and runs faster.
//ddf.map_partitions(lambda x: dask.sizeof.sizeof(x)).compute()
0 56822960
1 57125360
2 56822960
3 57246320
4 57306800
...
1090 56974160
1091 57004400
1092 57337040
1093 56822960
1094 57004400
Length: 1095, dtype: int64//]]>
This takes 92 seconds to run, which is 21% faster than on the same dataset.
The sizeof results are an approximation, but they’re pretty close as you can see.
This blog post taught you how to measure the memory usage in your DataFrame with memory_usage_per_partition, memory_usage and sizeof.
Dask makes it easy for you to compute how data is stored in different partitions. This is not as easy with other distributed compute engines.
Make sure to play around with the memory sizes and the number of partitions that work best for your query patterns. 100 MB partitions generally work well, but you might need to make tweaks based on your query patterns and instance types.
Thanks for reading. If you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.