Dask vs Spark
Many Dask users and Coiled customers are looking for a Spark/Databricks replacement. This article discusses the problem that these folks are trying to solve, the relative strengths of Dask/Coiled for large-scale ETL processing, and also the current shortcomings. We focus on the shortcomings of Dask in this regard and describe ongoing technical efforts to address these.
This article is mainly intended to provide context to development teams, rather than serve as a sales pitch.
The bulk of distributed compute time spent in the world seems to be ETL or SQL-like queries. In Dask terms they are mostly dask bag and dask dataframe focused, with a bit of dask delayed/futures. These users care about ingesting large volumes of data, performing basic operations on that data, joining against other datasets, and storing it into analysis-ready formats like Parquet.
We also see many people interested in advanced machine learning, but they’re often blocked by just being able to look at and process their data at scale. We’re excited to push forward on Dask’s traditional roots of advanced computing, but want to make sure that basic data processing fundamentals are strong first. This seems to be where most pain is focused today.
In these large-scale data processing situations, scale and predictability are far more important than flexibility and creative analyses. Dask has some advantages here, but also some drawbacks today.
We write about Dask advantages often, so we’ll be brief here. Folks seem to prefer Dask for the following reasons:
However, historically Dask developers have avoided attacking the Spark/ETL space head-on. Instead, we’ve focused on all of the domains that Spark really just couldn’t support (arbitrary task scheduling, workflow management, ML, array computing, general-purpose computing, and so on …)
As a result, Dask has to close a maturity gap here if we want to support these users at scale. The rest of this article discusses several technical features where Dask falls short, and which need to be improved in order to provide a high-quality ETL-at-scale experience with Spark-level maturity.
Parquet has become the default tabular data storage format today. Python support for Parquet is strong, with two high-quality solutions, Arrow and FastParquet, as well as robust cloud object storage support with fsspec.
However, when operating at the 1-10TB scale or above, these solutions start to fail, at least in the way that Dask uses them. This is for a variety of reasons:
The solution here is composed of several small fixes across several projects (Dask, Pandas, Arrow, s3fs, fsspec, thrift, …) This is being tracked in the meta-issue Processing Large Remote Parquet Datasets.
This is currently being resolved by engineers like James Bourbeau (Coiled), Joris Van den Bossche (Voltron Data), Rick Zamora (NVIDIA), and Martin Durant (Anaconda).
There is also some interesting discussion around data lake management systems (DeltaLake, Iceberg, Hudi) in the next section, which provides solutions to some of the problems mentioned above.
Technologies like Delta Lake (Databricks), Apache Iceberg (Netflix, Tabular), and Apache Hudi (Uber) provide management layers on top of data lakes. These technologies solve a variety of issues that arise in larger data engineering pipelines.
Database technologies like MemSQL, Snowflake, and BigQuery allow for huge queries to be run in less than a second. These technologies are great for applications that require low latency, like dashboards.
The Python space has traditionally not engaged deeply here, probably because our technology stack is mostly used for exploratory data analysis on analysis-ready data. However, as Python starts to displace Spark/Databricks for some of these workloads we’re relearning some of the pain points that the Spark community has already experienced. Some specific examples of common pain points:
Write access is harder, but probably also necessary as we walk further down this path. There is some internal conversation within Coiled, Voltron Data, and other companies about which solution to focus on, with some preference being for Iceberg, due to the more open community nature, and also a stronger relationship with the folks at Tabular.
When datasets need to be sorted or joined with other very large datasets this requires a full dataset communication. Every input chunk of data needs to contribute a little bit to every output chunk of data. This operation is commonly called a shuffle.
Spark’s shuffle implementation is solid, following a similarly strong tradition from Hadoop/Hive. In fact, shuffle is so core to Spark that most Spark users mistakenly call all worker-worker communications “shuffles”. Spark does this so well that they don’t try to support much else.
Dask supports a variety of other communication patterns, and so we never optimized this pattern much. As a result, Dask’s shuffle algorithm has the following flaws today:
As a result, Dask is able to perform any shuffle, but slowly, especially when memory is scarce. We don’t max out the performance of all available hardware, which is arguably Dask’s job.
This is currently being resolved by Coiled engineers, notably Gabe Joseph, Florian Jetter, and James Bourbeau. We’re building a separate shuffling systemservice to handle these workloads explicitly. This will be a third option alongside the currently available partd and tasks options.
As we get this system to run with increasingly high levels of performance and increasingly high stability guarantees we’re also exposing a variety of other performance issues in Dask, Pandas, and even CPython which we’re working on as well. We hope for an early opt-in release of this in the next month.
Most developers do not write optimal code.
In theory, Dask could look at user code and suggest alternatives. This can result in, for example, reading only the relevant columns from a data store, moving a row filter or a join to some earlier or later stage in the pipeline to eliminate unnecessary work or any of a number of other optimizations that can easily save an order of magnitude of effort.
import dask.dataframe as dd
df = dd.read_parquet("...")
df['z'] = df.x + df.y
df = df[df.balance > 0]
import dask.dataframe as dd
df = dd.read_parquet("...", columns=["x", "y", "balance"]) # Only read necessary columns
df = df[df.balance > 0] # filter first
df['z'] = df.x + df.y
Historically Dask has focused more on low-level optimizations that are applicable throughout all of the Dask subprojects. Spark on the other hand has developed a specialized query optimizer, much like a traditional database. This gives Spark the ability to avoid tremendous amounts of work in common situations, like above.
We’re currently looking at adding a high-level expression layer to Dask collections (array, bag, dataframe). This will allow future developers to add query optimizations incrementally. This change will affect not only Dask dataframe (the main subject of this article) but will also make space for changes in any Dask collection, notably including Dask array as well as other collections, like Xarray, RAPIDS, Dask-GeoPandas, and more.
This work is being performed by Jim Crist-Harif (Coiled), Aaron Meurer (Quansight, also lead SymPy maintainer), and Rick Zamora (NVIDIA).
Many companies today are excited about using RAPIDS, but find it challenging to use. This is due to a few problems:
As a result, GPUs and RAPIDS are something that most companies “are looking forward to” especially given the promised cost reductions, but haven’t actually tried in production.
These problems are especially challenging in data-intensive workloads, much more so than in machine learning situations. In deep learning situations there is so much compute-heavy work to do that computing costs dominate other costs like communication or disk access. However, in data-intensive workloads, these costs are more balanced, and so we must look at accelerated networking and disk hardware at the same time that we add accelerated compute hardware. This is all available today, but configuring and balancing these systems requires special consideration.
On the IT side, products like Coiled are designed to make it easy to achieve a professionally managed IT stack quickly. However, doing this well requires care, and while Coiled offers GPU support, we haven’t yet fully solved the problem of optimally efficient configurations on the cloud, which is an order of magnitude harder than just giving “GPUs on the cloud’. The balance between GPUs, network, disk, and other infrastructure is challenging to get perfect.
On the user side, projects like Afar, made by Erik Welch (Anaconda) make it easy to blend between the user’s local session and the execution environment on the cluster. This is a little bit like Spark’s Livy project, but much finer-grained and usable. Coiled engineers have been using Afar for a while now (mostly to avoid sending large task graphs over consumer internet connections) and have generally found it a pleasure to use.
We are hopeful that this helps to make GPU clusters feel closer at hand.
Finally, many companies report wanting to connect these computational systems up to other business infrastructure. The Dask-SQL project is being led by Nils Braun (Bosch), and it provides a SQL interface on top of Dask dataframes.
I’ll admit to being torn by this topic. I still strongly recommend that users use a proper database (Postgres, Snowflake, Dremio, Redshift, BigQuery, …) for routine SQL analysis. Databases are just better at this work than Dask will ever be. Normally in Dask we focus on interfacing smoothly with existing database technologies, like ongoing work from James Bourbeau and Florian Jetter (Coiled) on Dask-Snowflake and Naty Clementi (Coiled) on Dask-Mongo and Dask-BigQuery.
Florian Jetter (Coiled) gave a nice counterexample to this though, I’m going to quote him below:
> In my mind, SQL on Dask is mostly to build a bridge between the technical and business world. Most databases are not suited for this kind of workload unless they can read natively a Parquet dataset already. Have you ever tried inserting 1TB / 1bn rows of data into a Postgres database? Of course, this is a space where things like Dremio, Presto, etc. shine but getting this up and running is not easy and/or expensive. Also, especially big companies are hesitant to introduce "yet another technology for the same thing". Even if we will never excel in this space, simply checking the box is valuable.
This is a lot of work across many different projects.
The PyData stack differs from Databricks/Spark in that we’re spread into many different projects. Larger initiatives like “PyData as a Spark replacement” require close collaboration among disparate teams. Fortunately, we’re already really good at this.
This work is mostly being carried out by employees at Coiled, Anaconda, Quansight, Voltron Data, and NVIDIA. This team works together with the broader community and forms a larger community. This only works because of the decade of shared experience we all have with each other, and a strong history of open communication.
Early results in each of these areas individually are good. These teams have been able to show forward progress on the timescale of weeks, with a completion horizon on the timeline of months.
However, efficient computing at scale is a challenging problem. As the larger and more relevant issues are resolved we increase scale, and with that increase of scale previously small and innocuous problems become quite significant. Having Dask operate with Spark-level maturity in this area where Spark was designed to succeed will likely take several months. That being said, the natural advantages of Dask (Python-native, GPUs, flexibility, lower cost) mean that we’ll likely become an attractive choice for a decent chunk of users before full maturity is reached (indeed, we’re there today for many use cases), and there is really good momentum in this space today. A lot of different institutions want to see this work out.
As a closing note, I’ll say that, while the technical changes are being driven by traditional business needs and applications, they have fortunate side effects for the scientific userbase of Dask, and in particular dask array users. As an example …
And so this work follows in the age-old Dask tradition of building infrastructural technology that is paid for by business, but also quite pragmatic for scientific advancement.
Finally, this set of problems requires exposure to intense workloads. It’s hard to deliver high-quality and robust solutions in a vacuum; when building a racecar you need to send it around the track a few thousand times, and then send it around a different track to see what breaks.
If your team has large Spark workloads that they’re looking to get off of and is interested in driving a new style of race cars then please do get in touch. You can fill out the form below and someone on the Coiled team will reach out shortly or try Coiled for yourself.
Thanks for reading!