Setting a Dask DataFrame index

January 4, 2022

This post demonstrates how to change a DataFrame index with set_index and explains when you should perform this operation.

Setting an index can make downstream queries faster, but usually requires an expensive data shuffle, so it should be used strategically.

This post demonstrates the set_index syntax on small datasets and then shows some query benchmarks on larger datasets.

What is an index?

Indexes are used by normal pandas DataFrames, Dask DataFrames, and many databases in general.  

Indexes let you efficiently find rows that have a certain value, without having to scan each row. In plain pandas, it means that after a set_index("col"), df.loc["foo"] is faster than df[df.col == "foo"] was before. The loc uses an efficient data structure that only has to check a couple rows to figure out where "foo" is. Whereas df[df.col == "foo"] has to scan every single row to see which ones match.

The thing is, computers are really really fast at scanning memory, so when you’re running pandas computations on one machine, index optimizations aren’t as important. 

But scanning memory across many distributed machines is not fast. So index optimizations that you don't notice much with pandas makes an enormous difference with Dask.

A Dask DataFrame is composed of many pandas DataFrames, potentially living on different machines, each one of which we call a partition. See the Dask docs for an illustration and in-depth explanation.

The Dask client has its own version of an index for the distributed DataFrame as a whole, called divisions. divisions is like an index for the indexes—it tracks which partition will contain a given value (just like pandas's index tracks which row will contain a given value). When there are millions of rows spread across hundreds of machines, it's too much to track every single row. We just want to get in the right ballpark—which machine will hold this value?—and then tell that machine to go find the row itself.

So divisions is just a simple list giving the lower and upper bounds of values that each partition contains. Using this, Dask does a quick binary search locally to figure out which partition contains a given value.

Just like with a pandas index, having known divisions let us change a search that would scan every row (df[df.col == "foo"]) to one that quickly goes straight to the right place (df.loc["foo"]). It's just that scanning every row is much, much slower in a distributed context with Dask than on one machine with plain pandas.

When to set an index

Setting an index in Dask is a lot slower than setting an index in pandas.  You shouldn’t always set indexes in Dask like you do in pandas. 

In pandas, you can get away with calling set_index even when it’s not necessary. Sometimes, you’ll see pandas code like set_index("foo").reset_index() in the wild, which sorts the data for no reason. In pandas, you can get away with this, because it's cheap. In Dask it's very, very not cheap. So you really should understand what set_index does and why you're doing it before you use it.

set_index syntax

Let’s create a small DataFrame and set one of the columns as the index to gain some intuition about how Dask leverages an index.

Create a pandas DataFrame with two columns of data, and a 2-partition Dask DataFrame from it.

//import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
   {"col1": ["aa", "dd", "cc", "bb", "aa"], "col2": ["a1", "d", "c", "b", "a2"]}
)
ddf = dd.from_pandas(df, npartitions=2)//]]>

Print the DataFrame and see that it has one index column that was created by default by pandas and two columns with data.

//print(ddf.compute())

 col1 col2
0   aa   a1
1   dd    d
2   cc    c
3   bb    b
4   aa   a2//]]>

Take a look at the divisions of ddf.

//ddf.divisions

(0, 3, 4)//]]>

ddf has two divisions.  Let’s create a print_partitions() helper method and print ddf to better illustrate the divisions.

//def print_partitions(ddf):
   for i in range(ddf.npartitions):
       print(ddf.partitions[i].compute())

print_partitions(ddf)

 col1 col2
0   aa   a1
1   dd    d
2   cc    c
  col1 col2
3   bb    b
4   aa   a2//]]>

The first partition contains values with indexes from 0 to 2.  The second division contains values with indexes 3 and above.

Create a new DataFrame with col1 as the index and print the result.

//ddf2 = ddf.set_index("col1", divisions=["aa", "cc", "dd"])

print(ddf2.compute())


col1  col2    
aa      a1
aa      a2
bb       b
cc       c
dd       d//]]>

Let’s confirm that the divisions of ddf2 are set as we specified.

//ddf2.divisions

('aa', 'cc', 'dd')//]]>

Let’s look at how the data is distributed in ddf2 by partition.

//print_partitions(ddf2)

col1  col2    
aa      a1
aa      a2
bb       b

col1  col2    
cc       c
dd       d//]]>

Notice that the set_index method sorted the DataFrame based on col1.  Take note of two important changes after set_index is run:

  • The divisions of ddf2 are now set as specified
  • The data is repartitioned, as per our division specifications. bb has moved to the first partition, and dd to the second, because that’s what our divisions specified.
  • Repeated values are all in the same partition. That is, both aa's are in the first partition.

Sorting a Dask DataFrame can be a slow computation, see this blog post for more detail.

You will only want to run set_index when you can leverage it to make downstream queries faster.

Let’s look at a big dataset and see how much faster some queries execute when an index is set.

Reading DataFrames with index

Let’s grab a few rows of data from a 662 million row DataFrame when filtering on the index.  Then, let’s run the same query when the index isn’t set to quantify the performance speedup offered by the index.

Read in the DataFrame using a 5 node Coiled cluster and verify that the timestamp is the index.

//import coiled
import dask

cluster = coiled.Cluster(name="demo-cluster", n_workers=5)

client = dask.distributed.Client(cluster)

ddf = dd.read_parquet(
   "s3://coiled-datasets/timeseries/20-years/parquet",
   storage_options={"anon": True, "use_ssl": True},
   engine="pyarrow",
)

ddf.head()//]]>

Let’s take a look at what Dask knows about the divisions of this DataFrame.

//ddf.known_divisions

True//]]>

You can print out all the divisions too if you’d like to inspect them.

//ddf.divisions

(Timestamp('2000-01-01 00:00:00'),
Timestamp('2000-01-08 00:00:00'),
Timestamp('2000-01-15 00:00:00'),
…//]]>

This DataFrame has 1,096 divisions.

Here’s a query that’ll grab three records from the DataFrame and takes 2 seconds to execute.

//len(ddf.loc["2000-01-01 00:00:02":"2000-01-01 00:00:04"])//]]>

Read in the same dataset without an index, so we can see how much slower the same query runs on a regular column that’s not the index.

//ddf2 = dd.read_parquet(
   "s3://coiled-datasets/timeseries/20-years/parquet",
   storage_options={"anon": True, "use_ssl": True},
   engine="pyarrow",
   index=False,
)

len(
   ddf2.loc[
       (ddf2["timestamp"] >= "2000-01-01 00:00:02")
& (ddf2["timestamp"] <= "2000-01-01 00:00:04")
   ]
)//]]>

That query takes 115 seconds to execute because it had to load and scan all of the data looking for those values, instead of jumping straight to the correct partition.  The query without the index is 58 times slower in this example.

Filtering on an index value is clearly a lot quicker than filtering on non-index columns.

How long does it take to set the index?

Let’s look at a query to compute the number of unique names in the data when the id equals 1001.  We’ll see if setting an index can improve the performance of the query.

Start by reading in the data and running a query to establish a baseline.

//ddf3 = dd.read_parquet(
   "s3://coiled-datasets/timeseries/20-years/parquet",
   storage_options={"anon": True, "use_ssl": True},
   engine="pyarrow",
   index=False,
)

ddf3.loc[ddf3["id"] == 1001].name.nunique().compute()//]]>

This query takes 75 seconds to execute.

dd3 has 1,095 partitions, which you can see by running ddf3.npartitions.

Setting an index before running the query

Let’s set an index and run the same query to see if that’ll make the query run faster.

//dask_computed_divisions = ddf3.set_index("id").divisions
unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))
len(unique_divisions) # 149
ddf3u = ddf3.set_index("id", divisions=unique_divisions)
ddf3u.npartitions # 148
ddf3u.loc[[1001]].name.nunique().compute()//]]>

This query takes 130 seconds to run.  Setting the index before running the query is slower than simply running the query.  It’s not surprising that this runs slower.  Setting the index requires a data sort, which is expensive.

This example just shows one way you can create the divisions that are used when the index is set.  You can create the divisions however you’d like.  Choosing the optimal divisions to make downstream queries faster is challenging.

It’s not surprising that this particular index does not improve the overall query time: we’re sorting all of the values, but we only needed to extract just one. It’s faster to just scan the un-sorted data and find the single value we’re interested in.

Let’s look at some more examples to gather some intuition and then look and when setting an index is more likely to improve query times.

Setting an index without specifying divisions

Let’s look at setting an index without explicitly setting divisions and running the same count query.

//ddf3a = ddf3.set_index("id") # 134 seconds

ddf3a.loc[[1001]].name.nunique().compute() # 118 seconds//]]>

This takes a total of 252 seconds to compute and is slower than the original query.  In this case, setting an index doesn’t help our query run faster.

Notice that ddf3.set_index("id") does not specify divisions, so Dask needs to go figure them out. To do this, it has to load and compute all of ddf3 immediately to look at the distribution of its values. Then, when you later call .compute(), it’ll load and compute ddf3 a second time. This is slow in general, and particularly bad if the DataFrame already has lots of operations applied to it—all those operations also have to run twice.

When divisions are passed, Dask doesn’t need to compute the whole DataFrame to figure them out, which is obviously a lot faster.

Setting an index and specifying divisions poorly

Now let’s try to manually set the divisions and run the same query.

//ddf3b = ddf3.set_index("id", divisions=[800, 900, 1000, 1100, 1200]) # 0.3 seconds

ddf3b.loc[[1001]].name.nunique().compute() # didn’t work//]]>

This didn’t work because we’re taking a DataFrame that used to have 1,095 partitions and trying to smash all that data into just 4 partitions. This runs out of memory. Clearly, using bad divisions is worse than the extra time for Dask to compute them for you.

Not setting the divisions well can cause problems. You want to pick divisions that:

  1. Separate the data into approximately equally-sized groups. If some values are more common than others, adjust partitions accordingly.
  2. Result in reasonably-sized partitions—around 100MB is typically a good target, which often means row counts around 100,000 per partition.

To pick good divisions, you must use your knowledge of the dataset. What range of values is there for the column? What sort of general distribution do they follow—a normal bell curve, a continuous distribution? Are there known outliers?

Another strategy is to let Dask compute the divisions once, then copy-paste them to reuse later:

//dask_computed_divisions = ddf3.set_index("id").divisions
unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))
print(repr(unique_divisions))
# ^ copy this and reuse//]]>

This is especially helpful if you’ll be rerunning a script or notebook on the same (or similar) data many times.

However, you shouldn't set divisions if the data you’re processing is very unpredictable. In that case, it’s better to spend the extra time and let Dask re-compute good divisions each time.

High level index strategies

Since set_index is an expensive operation, you should only run the computation when it’ll help subsequent computations run faster.

Here are the main operations that’ll run faster when an index is set:

By all means, set indexes whenever it’ll make your analysis faster.  Just don’t run these expensive computations unnecessarily.

For example, you shouldn't always set_index before you .loc. If you just need to pull a value out once, it's not worth the cost of a whole shuffle. But if you need to pull lots of values out, then it is. Same with a merge: if you're just merging a DataFrame to another, don't set_index first (the merge will do this internally anyway). But if you're merging the same DataFrame multiple times, then the set_index is worth it.

As a rule of thumb, you should only set_index if you'll do a merge, groupby(df.index), or .loc on the re-indexed DataFrame more than once.

You may also want to re-index your data before writing it to storage in a partitioned format like Parquet. That way, when you read the data later, it’s already partitioned the way you want, and you don’t have to re-index it every time.

Conclusion

set_index changes the index of a DataFrame, sorts the underlying data, and ensures the data is partitioned in a manner consistent with the divisions.  Divisions allow for you to cheaply look up values without having to process every partition to search for them.

Sorting is an expensive operation when data is distributed, but it can be worth it if you’re performing multiple operations that run faster on data with an index.

You may also want to sort data before persisting it on disk, so Dask can read it with an index and you don’t need to perform the sort later.

Keep this powerful performance optimization in mind when you’re performing analyses with Dask.

Thanks for reading! And if you’re interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

With GitHub, Google or email.

Use your AWS, GCP, or Azure account.

Start scaling.

$ pip install coiled
$ coiled setup
$ ipython
>>> import coiled
>>> cluster = coiled.Cluster(n_workers=500)