Matthew Powers

Developer Advocate

December 9, 2021

This post explains how to materialize in-memory results with Dask compute. It explains the limitations of compute and how to minimize Dask DataFrame compute calls.

Dask DataFrames are composed of a collection of underlying pandas DataFrames (partitions). compute() concatenates all the Dask DataFrame partitions into a single pandas DataFrame.

When the Dask DataFrame contains data that’s split across multiple nodes in a cluster, then compute() may run slowly. It can also cause out of memory errors if the data isn’t small enough to fit in the memory of a single machine.

Dask was created to solve the memory issues of using pandas on a single machine. When you run compute(), you’ll face all the normal memory limitations of pandas.

Let’s look at some examples and see when it’s best to use Dask DataFrame compute() in your analyses.

compute() converts a Dask DataFrame to a pandas DataFrame. Let’s demonstrate with a small example.

Create a two-column Dask DataFrame and then convert it to a pandas DataFrame.

//import dask.dataframe as dd

import pandas as pd

df = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, 2, 3, 4]})

ddf = dd.from_pandas(df, npartitions=2)

ddf.compute()

col1 col2

0 a 1

1 b 2

2 c 3

3 d 4//]]>

Verify that Dask DataFrame compute() returns a pandas DataFrame.

//type(ddf.compute())

pandas.core.frame.DataFrame//]]>

Dask DataFrames are composed of many underlying pandas DataFrames, each of which is called a partition. It’s no problem calling Dask compute when the data is small enough to get collected in a single pandas DataFrame, but this will break down whenever the data is too big to fit in the memory of a single machine.

Let’s run compute() on a large DataFrame in a cluster environment and take a look at the error message.

Create a 5 node Dask cluster and read in a 663 million row dataset into a Dask DataFrame. This data is stored in a public bucket and you can use Coiled for free if you’d like to run these computations yourself.

//import coiled

import dask

import dask.dataframe as dd

cluster = coiled.Cluster(name="demo-cluster", n_workers=5)

client = dask.distributed.Client(cluster)

ddf = dd.read_parquet(

"s3://coiled-datasets/timeseries/20-years/parquet",

storage_options={"anon": True, "use_ssl": True},

engine="pyarrow",

)//]]>

Perform a filtering operation on the Dask DataFrame and then collect the result into a single pandas DataFrame with Dask compute.

//res = ddf.loc[ddf["id"] > 1150]

res.compute()//]]>

This works because res only has 1,103 rows of data. It’s easy to collect such a small dataset into a single pandas DataFrame.

The computation will error out if we try to collect the entire 663 million row dataset into a pandas DataFrame.

//ddf.compute()//]]>

This dataset has 58 GB of data, which is too much to fit on a single machine.

Here’s a diagram that visually demonstrates how compute works, to give you some additional intuition.

Suppose you have a 3 node cluster with 4 partitions. You run a Dask DataFrame compute() operation to collect all of the data in a single Pandas DataFrame.

This diagram clearly illustrates why the compute() operation can cause out-of-memory exceptions. Data that fits when it’s split across multiple machines in a cluster won’t necessarily fit in a single pandas DataFrame on only one machine.

Compute can be an expensive operation, so you want to minimize Dask DataFrame compute calls whenever possible.

Let’s look at the following code snippet that takes 63 seconds to run on a 5 node Dask cluster.

//%%time

id_min = ddf.id.min().compute()

id_max = ddf.id.max().compute()

//]]>

We can refactor this code to only run compute once and it’ll run in 33 seconds.

//%%time

id_min, id_max = dask.compute(ddf.id.min(), ddf.id.max())

//]]>

Fewer compute calls will always run faster than more compute invocations.

You can call Dask compute if you’ve performed a large filtering operation or another operation that decreases the size of the overall dataset. If your data comfortably fits in memory, you don’t need to use Dask. Just stick with pandas if your data is small enough.

You will also call compute when you’d like to force Dask to execute the computations and return a result. Dask executes computations lazily by default. It’ll avoid running expensive computations until you run a method like compute that forces computations to be executed.

You can also call Dask compute on Dask Arrays when you’d like to convert them to NumPy arrays.

Let’s demonstrate how to create a Dask Array and then convert it back to a NumPy array with compute.

//import numpy as np

arr1 = np.array([[1, 2], [3, 4]])

darr1 = dask.array.from_array(arr1)//]]>

darr1 is a Dask Array. Now let’s convert it to a NumPy array and confirm the resulting type is what we’d expect.

//darr1.compute()

type(darr1.compute()) # numpy.ndarray//]]>

Compute has the same concept when run on Dask arrays. It takes NumPy array data that’s been split to multiple computers and collects it all to a single machine. This operation will blow up if the array data is too large to fit on a single machine.

You’ll often use compute when programming with Dask to return final results.

There are times when you’ll want to use compute to switch the analysis to NumPy or pandas, but that’ll probably be less common. Most Dask users have already dealt with scaling problems for technologies that are only meant to fit in memory on one machine and are more interested in writing scalable code with Dask.

Use the tactics outlined in this post to minimize the number of times you need to invoke compute.

Thanks for reading! And if you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.