This post explains the benefits of the Parquet file format and why it’s usually better than CSVs for most Dask analyses.
File formats like CSV and JSON have the following issues:
The rest of this blog explains the CSV file format limitations and how Parquet offers improvements on all fronts. It also shows how Parquet query pushdown can provide massive performance gains and why query pushdown isn’t possible with CSV files.
The concepts covered in this post apply to popular libraries like pandas, Dask, and Spark.
Parquet files store metadata, including the schema of the dataset, in the file footer.
Each column in a dataset has a name and a type. Look at the following example data:
//first_name,age
Linda,34
Paula,65
Grace,12//]]>
The first_name column contains string values and the age column contains integer values.
When a library like pandas or Dask reads a Parquet file, it can simply fetch the schema in the file footer to figure out the column names and types.
When Dask is reading a CSV file, here is how it figures out the schema:
The Parquet schema will always properly match the data that’s in the file. You’ll never have a Parquet file with a column that claims to be a string and actually contains integers. Parquet will error out if you try to write a file that’s misconfigured.
When you write a dataset to Parquet, the schema is automatically added. You don’t need to perform any extra steps.
Parquet lets you read specific columns from a dataset without reading the entire file. This is called column pruning and can be a massive performance improvement.
Parquet is a columnar file format, unlike CSV which is a row-based file format. Column pruning is only possible for columnar file formats.
Suppose you have a DataFrame with the following schema:
Here’s how to read a CSV file (column pruning isn’t possible):
//ddf = dd.read_csv(
"s3://coiled-datasets/timeseries/20-years/csv/*.part",
storage_options={"anon": True, 'use_ssl': True}
)//]]>
Here’s how to read a Parquet file without leveraging column pruning:
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True}
)//]]>
Here’s how to read a single column of a Parquet file with column pruning:
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True},
columns=["name"]
)//]]>
You need to manually specify the columns argument to take advantage of column pruning with Dask.
This section shows the query speeds when column pruning is applied to a 662 million row dataset.
All computations are run on a 5 node Dask cluster that’s hosted by Coiled. Here’s a link to the notebook.
We’ll run a query to get the distinct number of names in the dataset on CSV files, Parquet files, and Parquet files that are read with column pruning. Here’s the query that’ll be run:
<pre class="language-python"><code class="language-pythong">ddf["id"].nunique().compute()</code></pre>
Here are the benchmarking results:
Query time (seconds)CSV files131Parquet files32.4Parquet files with column pruning29.8
Column pruning provides a modest speed boost in this example, but it can be much greater on other datasets and for other queries. Simply using the Parquet file format, even without column pruning, also provides a nice query speed-up compared to CSV files.
The benefits of column pruning increase as more columns are skipped. It’s especially beneficial when columns that take a lot of memory, like strings, can be skipped. You can expect a big performance boost if you have a dataset with 100 columns and can skip 90 columns with column pruning.
Query pushdown is when computations happen at the “database layer” instead of the “execution engine layer”. In this case, the database layer is Parquet files in a filesystem, and the execution engine is Dask.
Parquet allows for predicate pushdown filtering, a form of query pushdown because the file footer stores row-group level metadata for each column in the file.
The row group metadata contains min/max values for each row group in the Parquet file and which can be used by Dask to skip entire portions of the data file, depending on the query. Parquet predicate pushdown filtering is best illustrated with an example.
Suppose you have a dataset with first_name and age columns and would like to get a count of everyone named Astrid that’s between 80 and 90 years old. Further, suppose your data file has three-row groups with the following min/max values.
row_groupFirst_name minFirst_name maxAge minAge max0camilaluisa3951anitacarla131032anthonymatt424
We can deduce that row group 1 is the only part of the file that’s relevant for our query from the Parquet metadata.
Row group 0 isn’t relevant because the minimum first_name is “higher” than Astrid (alphabetically speaking).
Row group 2 isn’t relevant because the maximum age is lower than the min age threshold in the query.
We can skip row groups 0 and 2 in our query and only run the filtering logic on row group 1. As with column pruning, the more data you can skip, the bigger the performance benefit.
Parquet predicate pushdown filtering can be used in conjunction with column pruning, of course. You can skip both columns and row groups.
Let’s run some benchmarks on the same 662 million row timeseries dataset with a 5 node Dask cluster. All the computations you’re about to see are in this notebook.
Predicate pushdown filtering is not possible with CSV files because there are no row groups with metadata. Here’s the syntax for applying predicate pushdowns when reading a Parquet file.
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True},
filters=[[('id', '>', 1170)]]
)//]]>
Predicate pushdowns can be applied in conjunction with column pruning, like so.
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True},
filters=[[('id', '>', 1170)]],
columns=["id"]
)//]]>
Here are the benchmarking results for this particular query:
Run time (seconds)CSV189Parquet95Parquet with predicate pushdown3.7Predicate pushdown and column pruning2.2
There are only a small number of rows that satisfy our predicate filtering criteria, so we can skip a lot of row groups and get a massive performance boost by leveraging predicate pushdown filtering for this particular query.
Predicate pushdown filters return row groups that contain at least one value that satisfies the predicate. They’re likely to return rows that don’t meet the predicate as well.
Let’s look at an example.
//Let’s look at an example.
ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True},
filters=[[('id', '>', 1170)]]
)
ddf.head()//]]>
So you still need to run the “regular Dask filtering” query after running the predicate pushdown filtering. Take a look at the full code snippet.
//ddf = dd.read_parquet(
"s3://coiled-datasets/timeseries/20-years/parquet",
storage_options={"anon": True, 'use_ssl': True},
filters=[[('id', '>', 1170)]]
)
len(ddf[ddf.id > 1170])//]]>
len(ddf[ddf.id > 1170]) returns 65, which is the correct result.
len(ddf) returns 38,707,200, which is far from the correct result.
Sorting data on columns that’ll be used for predicate pushdown filters and creating the right number of row groups per file are imperative for leveraging Parquet predicate pushdown filters to the fullest extent.
Parquet files are immutable, which is unfamiliar for analysts that are used to mutable file formats like CSVs.
For example, you can open a CSV file, add a column of data, and save it, thereby mutating the original file.
If you want to add a column of data to a Parquet file, then you need to read it into a processing engine like Dask and write out an entirely new file.
This also applies to deleting rows of data. Suppose you have a Parquet file with 100,000 rows and would like to delete one row of data. You perform this “delete” by reading the entire file into Dask, filtering out the row of data you no longer want, and writing a new 99,999 row file.
In practice, the immutable nature of Parquet files is less important than you’d think. Most files for production data analyses are stored in cloud-based object stores like AWS S3. Cloud object store files are immutable, regardless of file type. CSVs are mutable locally, but not mutable once they’re uploaded to S3.
Immutable files save you from a lot of nasty bugs. Data files should be immutable, and changing data can cause unexpected downstream consequences. Data isn’t usually versioned, so data mutations can’t simply be rolled back.
CSV files are still widely used because they’re human-readable. Parquet files are binary blobs that can’t be opened and read by humans.
CSVs are a decent option for a small file that needs to be frequently modified by a business user for example.
Lots of datasets you’ll find in the wild are already in the CSV file format. Most data analysts just stick with the existing file format. You can often save yourself a big headache by converting the CSV files to Parquet as the first step of your analysis.
You’ve learned about the benefits of Parquet files in this blog, and you should be convinced that Parquet is better than CSV for a lot of data projects.
Pandas, Dask, and other data execution frameworks make it easy to work with Parquet files. They all have built-in methods for reading and writing Parquet files.
Start using column pruning and predicate pushdown filtering and enjoy running your queries in less time!
Plus, if you haven't already, you can try Parquet files on Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, for free today when you sign up below.