Dask vs Spark | Dask as a Spark Replacement

October 4, 2021

What’s broken and how we’re fixing it

Dask vs Spark

Many Dask users and Coiled customers are looking for a Spark/Databricks replacement.  This article discusses the problem that these folks are trying to solve, the relative strengths of Dask/Coiled for large-scale ETL processing, and also the current shortcomings.  We focus on the shortcomings of Dask in this regard and describe ongoing technical efforts to address these.

This article is mainly intended to provide context to development teams, rather than serve as a sales pitch.

Motivation: ETL and SQL-like queries dominate pain today

The bulk of distributed compute time spent in the world seems to be ETL or SQL-like queries.  In Dask terms they are mostly dask bag and dask dataframe focused, with a bit of dask delayed/futures.  These users care about ingesting large volumes of data, performing basic operations on that data, joining against other datasets, and storing it into analysis-ready formats like Parquet.

We also see many people interested in advanced machine learning, but they’re often blocked by just being able to look at and process their data at scale.  We’re excited to push forward on Dask’s traditional roots of advanced computing, but want to make sure that basic data processing fundamentals are strong first.  This seems to be where most pain is focused today.

In these large-scale data processing situations, scale and predictability are far more important than flexibility and creative analyses.  Dask has some advantages here, but also some drawbacks today.

Dask vs Spark: Dask advantages

We write about Dask advantages often, so we’ll be brief here.  Folks seem to prefer Dask for the following reasons:

  1. They like Python and the PyData stack
  2. They’re cost-sensitive and have seen good savings when comparing Dask against Spark
  3. They feel locked-in by Databricks, mostly in terms of user experience and flexibility
  4. They want increased flexibility from all of the other parts of Dask, particularly the general-purpose programming APIs, delayed and futures, and the ML integrations
  5. They want to reduce costs with adaptive scaling
  6. They want to reduce costs and increase speed with GPUs

Dask vs Spark: Dask disadvantages

However, historically Dask developers have avoided attacking the Spark/ETL space head-on.  Instead, we’ve focused on all of the domains that Spark really just couldn’t support (arbitrary task scheduling, workflow management, ML, array computing, general-purpose computing, and so on …)  

As a result, Dask has to close a maturity gap here if we want to support these users at scale.  The rest of this article discusses several technical features where Dask falls short, and which need to be improved in order to provide a high-quality ETL-at-scale experience with Spark-level maturity.

Parquet

Parquet has become the default tabular data storage format today.  Python support for Parquet is strong, with two high-quality solutions, Arrow and FastParquet, as well as robust cloud object storage support with fsspec.

Problem

However, when operating at the 1-10TB scale or above, these solutions start to fail, at least in the way that Dask uses them.  This is for a variety of reasons:

  1. The size of metadata grows beyond serialization limits
  2. Intermittent S3 failures become common
  3. ETL workloads quickly require dataset/table mutations (e.g. append, update) which exposes us to an entire zoo of consistency and performance problems that the current Parquet interface cannot deal with (more on this later)
Solution

The solution here is composed of several small fixes across several projects (Dask, Pandas, Arrow, s3fs, fsspec, thrift, …)  This is being tracked in the meta-issue Processing Large Remote Parquet Datasets.   

This is currently being resolved by engineers like James Bourbeau (Coiled), Joris Van den Bossche (Voltron Data), Rick Zamora (NVIDIA), and Martin Durant (Anaconda).

There is also some interesting discussion around data lake management systems (DeltaLake, Iceberg, Hudi) in the next section, which provides solutions to some of the problems mentioned above.

Data Lake formats

Technologies like Delta Lake (Databricks), Apache Iceberg (Netflix, Tabular), and Apache Hudi (Uber) provide management layers on top of data lakes.  These technologies solve a variety of issues that arise in larger data engineering pipelines.

Database technologies like MemSQL, Snowflake, and BigQuery allow for huge queries to be run in less than a second.  These technologies are great for applications that require low latency, like dashboards.

Problem

The Python space has traditionally not engaged deeply here, probably because our technology stack is mostly used for exploratory data analysis on analysis-ready data.  However, as Python starts to displace Spark/Databricks for some of these workloads we’re relearning some of the pain points that the Spark community has already experienced.  Some specific examples of common pain points:

  • While someone is writing to the data lake, reads don’t work
  • Data lakes may have tens or hundreds of thousands of files.  Performing a S3 file listing operation before reading the data can be really expensive.
  • If you’re in the middle of a write and the cluster dies, then you’ll have a bunch of partially written files and all subsequent reads won’t work
  • Multiple people might write to the data lake at the same time
  • A few rows of data need to get deleted from the data lake for GDPR compliance, but that’s actually really hard because Parquet files are immutable
  • Tens of thousands of tiny files need to get compacted into bigger files to make data reads faster
Solution

Read support for Delta/Iceberg/Hudi isn’t hard to implement (indeed, here is a recent community PR adding Deltalake read support to Dask dataframe from scratch by Gurunath).

Write access is harder, but probably also necessary as we walk further down this path.  There is some internal conversation within Coiled, Voltron Data, and other companies about which solution to focus on, with some preference being for Iceberg, due to the more open community nature, and also a stronger relationship with the folks at Tabular.

Robust shuffle

When datasets need to be sorted or joined with other very large datasets this requires a full dataset communication.  Every input chunk of data needs to contribute a little bit to every output chunk of data.  This operation is commonly called a shuffle.

Spark’s shuffle implementation is solid, following a similarly strong tradition from Hadoop/Hive.  In fact, shuffle is so core to Spark that most Spark users mistakenly call all worker-worker communications “shuffles”.  Spark does this so well that they don’t try to support much else.  

Problem

Dask supports a variety of other communication patterns, and so we never optimized this pattern much.  As a result, Dask’s shuffle algorithm has the following flaws today:

  • At high partition count, it generates very many tasks, the handling of which can become more expensive than the computation itself
  • Spilling data to disk is not finely controlled, resulting in suboptimal use of hardware when datasets are larger than memory (full-dataset shuffles require all data to be accessible at once)
  • Network communication during shuffles is not well optimized.

As a result, Dask is able to perform any shuffle, but slowly, especially when memory is scarce.  We don’t max out the performance of all available hardware, which is arguably Dask’s job.

Solution

This is currently being resolved by Coiled engineers, notably Gabe Joseph, Florian Jetter, and James Bourbeau.  We’re building a separate shuffling systemservice to handle these workloads explicitly.  This will be a third option alongside the currently available partd and tasks options.

As we get this system to run with increasingly high levels of performance and increasingly high stability guarantees we’re also exposing a variety of other performance issues in Dask, Pandas, and even CPython which we’re working on as well.  We hope for an early opt-in release of this in the next month.  


High-level query optimization

Most developers do not write optimal code.  

Problem

In theory, Dask could look at user code and suggest alternatives.  This can result in, for example, reading only the relevant columns from a data store, moving a row filter or a join to some earlier or later stage in the pipeline to eliminate unnecessary work or any of a number of other optimizations that can easily save an order of magnitude of effort.

Example

//# Before
import dask.dataframe as dd
df = dd.read_parquet("...")
df['z'] = df.x + df.y
df = df[df.balance > 0]
df.z.sum().compute()//]]>//# After
import dask.dataframe as dd
df = dd.read_parquet("...", columns=["x", "y", "balance"])  # Only read necessary columns
df = df[df.balance > 0]  # filter first
df['z'] = df.x + df.y
df.z.sum().compute()//]]>

Historically Dask has focused more on low-level optimizations that are applicable throughout all of the Dask subprojects.  Spark on the other hand has developed a specialized query optimizer, much like a traditional database.  This gives Spark the ability to avoid tremendous amounts of work in common situations, like above.

Solution

We’re currently looking at adding a high-level expression layer to Dask collections (array, bag, dataframe).  This will allow future developers to add query optimizations incrementally.  This change will affect not only Dask dataframe (the main subject of this article) but will also make space for changes in any Dask collection, notably including Dask array as well as other collections, like Xarray, RAPIDS, Dask-GeoPandas, and more.

This work is being performed by Jim Crist-Harif (Coiled), Aaron Meurer (Quansight, also lead SymPy maintainer), and Rick Zamora (NVIDIA).

GPU accessibility

Problem

Many companies today are excited about using RAPIDS, but find it challenging to use.  This is due to a few problems:

  1. Setting up a GPU stack in the cloud (or anywhere) is hard
  2. Setting up a GPU stack in the cloud with proper hardware configuration for real performance, especially in ETL situations, is very hard
  3. Local experimentation is difficult if users don’t have a local GPU

    And notably, no Apple hardware today and very little other consumer hardware comes with NVIDIA GPUs by default

As a result, GPUs and RAPIDS are something that most companies “are looking forward to” especially given the promised cost reductions, but haven’t actually tried in production.

These problems are especially challenging in data-intensive workloads, much more so than in machine learning situations.  In deep learning situations there is so much compute-heavy work to do that computing costs dominate other costs like communication or disk access.  However, in data-intensive workloads, these costs are more balanced, and so we must look at accelerated networking and disk hardware at the same time that we add accelerated compute hardware.  This is all available today, but configuring and balancing these systems requires special consideration.

Solution

On the IT side, products like Coiled are designed to make it easy to achieve a professionally managed IT stack quickly.  However, doing this well requires care, and while Coiled offers GPU support, we haven’t yet fully solved the problem of optimally efficient configurations on the cloud, which is an order of magnitude harder than just giving “GPUs on the cloud’.  The balance between GPUs, network, disk, and other infrastructure is challenging to get perfect.

On the user side, projects like Afar, made by Erik Welch (Anaconda) make it easy to blend between the user’s local session and the execution environment on the cluster.  This is a little bit like Spark’s Livy project, but much finer-grained and usable.  Coiled engineers have been using Afar for a while now (mostly to avoid sending large task graphs over consumer internet connections) and have generally found it a pleasure to use.  

We are hopeful that this helps to make GPU clusters feel closer at hand.

SQL API

Finally, many companies report wanting to connect these computational systems up to other business infrastructure.  The Dask-SQL project is being led by Nils Braun (Bosch), and it provides a SQL interface on top of Dask dataframes.

I’ll admit to being torn by this topic.  I still strongly recommend that users use a proper database (Postgres, Snowflake, Dremio, Redshift, BigQuery, …) for routine SQL analysis.  Databases are just better at this work than Dask will ever be.  Normally in Dask we focus on interfacing smoothly with existing database technologies, like ongoing work from James Bourbeau and Florian Jetter (Coiled) on Dask-Snowflake and Naty Clementi (Coiled) on Dask-Mongo and Dask-BigQuery.

Florian Jetter (Coiled) gave a nice counterexample to this though, I’m going to quote him below:

> In my mind, SQL on Dask is mostly to build a bridge between the technical and business world. Most databases are not suited for this kind of workload unless they can read natively a Parquet dataset already. Have you ever tried inserting 1TB / 1bn rows of data into a Postgres database? Of course, this is a space where things like Dremio, Presto, etc. shine but getting this up and running is not easy and/or expensive. Also, especially big companies are hesitant to introduce "yet another technology for the same thing". Even if we will never excel in this space, simply checking the box is valuable.

Dask vs Spark: Community and teamwork

This is a lot of work across many different projects.  

The PyData stack differs from Databricks/Spark in that we’re spread into many different projects.  Larger initiatives like “PyData as a Spark replacement” require close collaboration among disparate teams.  Fortunately, we’re already really good at this.

This work is mostly being carried out by employees at Coiled, Anaconda, Quansight, Voltron Data, and NVIDIA.  This team works together with the broader community and forms a larger community.  This only works because of the decade of shared experience we all have with each other, and a strong history of open communication.

Dask vs Spark: Early results and timelines

Early results in each of these areas individually are good.  These teams have been able to show forward progress on the timescale of weeks, with a completion horizon on the timeline of months.  

However, efficient computing at scale is a challenging problem.  As the larger and more relevant issues are resolved we increase scale, and with that increase of scale previously small and innocuous problems become quite significant. Having Dask operate with Spark-level maturity in this area where Spark was designed to succeed will likely take several months.  That being said, the natural advantages of Dask (Python-native, GPUs, flexibility, lower cost) mean that we’ll likely become an attractive choice for a decent chunk of users before full maturity is reached (indeed, we’re there today for many use cases), and there is really good momentum in this space today.  A lot of different institutions want to see this work out.

Hat tip to science 

As a closing note, I’ll say that, while the technical changes are being driven by traditional business needs and applications, they have fortunate side effects for the scientific userbase of Dask, and in particular dask array users.  As an example …

  1. Parquet: The changes around Parquet at scale often make us deal with cloud object stores more intelligently, which supports other scientific formats, like Zarr and TileDB
  2. Shuffle: The patterns used to develop the shuffle system could be repeated for other similarly challenging operations in dask array, like map_overlap, rechunk, and tensordot
  3. High-level query optimization: High-level expressions will allow for more automatic rechunking schemes in dask array, and better overall analysis before execution
  4. GPUs: the changes here are entirely agnostic to dataframes, and should be useful throughout the ecosystem

And so this work follows in the age-old Dask tradition of building infrastructural technology that is paid for by business, but also quite pragmatic for scientific advancement.

Need for Partners / Customers

Finally, this set of problems requires exposure to intense workloads.  It’s hard to deliver high-quality and robust solutions in a vacuum; when building a racecar you need to send it around the track a few thousand times, and then send it around a different track to see what breaks.

If your team has large Spark workloads that they’re looking to get off of and is interested in driving a new style of race cars then please do get in touch. You can fill out the form below and someone on the Coiled team will reach out shortly or try Coiled for yourself.


Thanks for reading!

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud