Map Joins and Dask’s Dynamic Optimizer

TL;DR: Map (Broadcast) Joins is a technique for improving join performance in data systems. Dask, like other systems, allows you to manually trigger a map join. But, thanks to optimizations built into Dask, you don't have to. Dask will automatically implement this functionality for you. Even better, Dask's approach is not specific to joins: the join optimization comes as a free benefit of a dynamic, workload-aware optimizer which helps Dask perform well on a huge variety of tasks.


Lots of people interested in Dask have questions about Dask internals and the answers are difficult to find. To help, we'll be writing a series of posts exposing the underbelly.

In a recent Dask workshop, participants asked about executing Map Joins with Dask. If you're not familiar with that technique, we'll explain it shortly. But the question makes sense for users coming from systems like Hadoop/Hive, Apache Spark, or Presto and wanting to apply their expertise in query optimization to work with Dask Dataframes. 

Although there is an "API answer" to the question, it's more interesting (and labor-saving) to see how Dask automatically discovers this optimization. Not only can we keep our code simple and make the system more accessible to users who might not have expertise in optimization, we learn about the benefits of Dask's optimizer for a variety of other workloads.

What Is a Map Join?

In distributed, large-data systems, it is common to perform joins where one side of the join is truly large (TB or bigger), while the other side is not so large (several GB or smaller). This occurs frequently with star-schema or snowflake-schema data warehouse designs, in which a large fact table like "transactions" is joined against many smaller dimension tables which might be “products” or “retail locations.”

The trick to performing these joins quickly is to avoid a fully distributed join. Instead, the small table is loaded on every worker, along with one or more pieces (partitions) of the large table. The worker can then iterate the records in the partitions of the large table, and perform a quick local lookup into the small table to see whether there are rows matching the join condition. In Hadoop/Hive, this is called a "Map Side Join" because, once the smaller table is local, the lookup is a map operation rather than one involving a shuffle or reduce. Apache Spark and Presto call this a Broadcast Join because the smaller table is supplied to every worker via a "broadcast" mechanism. Since the quick local lookup is usually done via a hash table on the join key(s), the procedure is sometimes called a "Broadcast Hash Join."

The Map Join approach can provide many-orders-of-magnitude speedup over the general distributed join (which typically involves a shuffle into chunks indexed by hash or value ranges) so it's beloved by all big data engineers. No wonder, then, that these engineers would ask about doing this sort of join with Dask.

What Would this Scenario Look Like with Dask?

Since Dask dataframes are made up of Pandas dataframe partitions, this situation appears any time one side of a join is a Dask dataframe that is too large to process in memory or on a single node, while the other side of the join can be treated as a Pandas dataframe -- i.e., it fits in memory. (You can think of a Pandas dataframe as a single-partition Dask dataframe if that is helpful.) 

So the goal is to load that Pandas dataframe on every Dask worker, and then join that worker’s Dask dataframe partitions (from the large table side) against the smaller now-local Pandas dataframe. The final result -- effectively the union of those local joins -- is the Dask dataframe result.

Now, how do we make this happen? Or, more precisely, how do we let Dask's Dynamic Optimizer make this happen for us?

Background on Optimizers

Within the table-oriented data engineering world, there are two main flavors of optimization, and systems can employ both types.

The logical, heuristic, or structural optimizers look at the structure of a query and apply universal rules that are known to improve efficiency without affecting the result. Examples include the Apache Spark Catalyst optimizer and the Apache Calcite Rule-Based Optimizer.

Cost-based, or statistical optimizers have cost models for query implementation and typically track distributional statistics about tables and columns in the datastore to plug into those cost models. This deeper knowledge comes at a price, though: computing statistics is expensive and may not make sense for jobs and datasets that are only touched once or twice. Examples with some support for cost-based optimization include Hive, Spark, and Impala.

When we move beyond "table-oriented" processing, things get more complex, since there are many more possible execution patterns and fewer safe assumptions. We'll see in the following section how Dask implements optimizations at the task-graph and function level.

Dask's Dynamic Optimizer

Since Dask is designed to handle arbitrary distributed workloads -- not just table-oriented, relational-style queries -- Dask takes an approach that is different from table-only systems.

The idea of a relational query is not a thing ("reified") within Dask. Instead, Dask implements two levels of related but more general optimizations.

First, Dask performs structural optimizations at the task-graph level (where a Dask task is just a Python function). One example of task-graph optimization is task fusion: combining smaller tasks into larger ones, to achieve a better granularity of tasks.

Second, Dask has a dynamic implementation of a cost-based optimizer, which tracks numerous statistics on tasks (functions) as well as the cluster itself (e.g., bandwidth between pairs of workers) in real-time. Gating thresholds interact with task statistics to produce a variety of emergent optimizations on arbitrary workloads. One of those happens to be the Map Join.

Background on Dask-SQL

Although Dask itself does not have a "relational-query-aware" optimizer, a new companion project, dask-sql, makes this additional level of optimization available.

Dask-SQL leverages Apache Calcite to translate SQL statements into a form corresponding to Dask operators. In the process, it leverages Calcite's mature optimizer to get improvements to query structure before the work even gets submitted to Dask's scheduler.

So how exactly do we get a Map Join for free from Dask's Dynamic Optimizer? It basically happens in two parts:

If we just ask Dask to run a join between our large Dask dataframe and our small one, Dask will start the job scheduling pieces of the join dependent on the small dataframe (since Dask is just a Python framework, and Pandas dataframes are just Python objects, albeit with native backing storage and execution "helpers").

That is, because Dask looks at our request as a set of related Python functions to run -- and not as a special sort of distributed table job -- the basic map-style join implementation is set up.

But another key piece of the map join implementation is the "broadcast" piece: we would ideally not want to send a complete copy of the small table from a single process (typically the client process where user code is running) to every single worker. We'd like a smart replication approach that distributes the load.

This is where the second step of Dask's optimization comes in: Dask includes thresholds and a backoff pattern for repeated operations. If, for example, 10, or 100, or 1000 workers ask for the same chunk of data from the same supplier, the number of requests allowed through will be capped, and other requests will have to wait and retry. By the time they do so, the needed data will be available on other workers, at which point the requestor will be more likely (due to random selection) to request the data from a less heavily loaded worker. Thus, over a period of time, Dask workers form an ad hoc peer-to-peer distribution system for the target payload, spreading it more efficiently.

Over the course of a large join, it achieves the "O(n)" performance we look for with a Map Join.

In this screenshot, we join a Pandas dataframe (we could, equivalently, join a 1-partition Dask dataframe) against a larger 4-partition Dask dataframe. The visualized graph shows that Dask will not shuffle but will provide the entirety of the small dataframe to each merge task by default.

The Takeaway: It's Bigger than Just Map Joins

As cool and as useful as this is for performing joins, the real power in the Dynamic Optimizer is the fact that Dask did not (and does not) know about the big-C Concept of a relational join. That concept, while helpful in this one case, would limit the general usefulness of the optimizer. 

Instead, by implementing optimizations as a set of general decision processes applied to scheduling tasks and distributing data, and supplying these decision processes with real-time statistical information, Dask offers hands-free (well, API-free) optimization that can improve many kinds of large-scale Python workloads.

Want more out of your data science?

Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we're always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud