Writing JSON from Dask DataFrames with to_json

This blog post demonstrates how to output the contents of a Dask DataFrame to JSON files.  JSON files normally aren’t the best for big data analyses, but they’re sometimes necessary, especially when downstream readers are designed to consume JSON.

This post will show you the different ways to write JSON files with Dask.  There are a lot of different options to customize the write operation.

You’ll also learn why JSON is typically slow for big data analysis and when to avoid this file format.

Dask to_json: simple example

Let’s start with a simple example that shows how to write JSON files from a Dask DataFrame.  Start by creating a Dask DataFrame with two partitions.

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, 2, 3, 4]})

ddf = dd.from_pandas(df, npartitions=2)//]]>

Write out the Dask DataFrame to JSON with to_json.

ddf.to_json("first-json")//]]>

Here are the contents of the first-json directory.

first-json/
 0.part
 1.part//]]>

Here’s the contents of the 0.part file.

{"col1":"a","col2":1}
{"col1":"b","col2":2}//]]>

Here are the contents of the 1.part file.

{"col1":"c","col2":3}
{"col1":"d","col2":4}//]]>

Dask writes each DataFrame partition to a separate file.  Writing to different files in parallel is faster than writing to a single file.  Multiple files are also better for big data because a single file has scaling limits.

Dask write JSON: different data layouts with orient options

Dask uses “records” as the default JSON orientation.  Here’s how to output the files with the orientation set to “split”.

ddf.to_json("json2", orient="split")//]]>

Here are the files that are written to disk.

json2/
 0.part
 1.part//]]>

Here are the contents of 0.part.

{"columns":["col1","col2"],"index":[0,1],"data":[["a",1],["b",2]]}//]]>

It’s cool that Dask is flexible and allows for different JSON orientation, but this also highlights a weakness of the JSON file format.  It’s not fully standardized, so different orientations are possible.  It’s painful to work with file formats that allow for different conventions.  Downstream readers need to have knowledge of what orientation the file is using, which might not be possible to know unless they manually inspect the file.

Dask to_json: customizing filename

You can create a function that lets you customize the filename that’s output from the write operation.

def my_json_filenamer(file_num):
   return f"part-{file_num:04}-data"

ddf.to_json("json3", name_function=my_json_filenamer)//]]>

Here are the files output to disk:

json3/
 part-0000-data.part
 part-0001-data.part//]]>

Customizing the filename is handy, especially when you’d like to add a unique batch identifier when you’re repeatedly writing files to the same folder.

Dask write JSON: compression

Let’s create a larger dataset and write it out to many JSON files that are compressed with gzip.

Let’s first create a 189 million row dataset.

ddf = dask.datasets.timeseries(
   start="2015-01-01",
   end="2020-12-31",
   freq="1s",
   partition_freq="7d",
   seed=42,
)//]]>

Now let’s write out the dataset as uncompressed JSON files to see how big the data is on disk.

ddf.to_json("timeseries-json")//]]>

This DataFrame has 313 partitions so 313 JSON files are written.  One file is written per partition.  These files are 11.67 GB on disk.

Let’s write out this same DataFrame with gzip compression.

ddf.to_json("timeseries-json-gzip", compression="gzip")//]]>

This outputs 313 gzipped JSON files that only take 3.17 GB on disk.  The gzip compression algorithm greatly reduces the file sizes.

Smaller files are generally better because they’re faster to transfer over the wire.

Dask to JSON: large write with cluster

Let’s create an even bigger dataset with 662 million rows of timeseries data.  We’ll create this dataset on a Dask cluster powered by Coiled, because the dataset is too large for most laptops to handle comfortably.

import coiled
import dask.dataframe as dd
import dask

cluster = coiled.Cluster(name="powers-demo", n_workers=10)

client = dask.distributed.Client(cluster)

ddf = dask.datasets.timeseries(
   start='2000-01-01',
   end='2020-12-31',
   freq='1s',
   partition_freq='7d',
   seed=42,
)//]]>

Let’s write this data to AWS S3 using our Dask cluster that’s powered by Coiled.

ddf.to_json("s3://coiled-datasets/timeseries/20-years/json")//]]>

Now let’s read in the data and run an analytical query.  We’re going to calculate all the unique values in the name column.

ddf = dd.read_json(
   "s3://coiled-datasets/timeseries/20-years/json/*.part",
   storage_options={"anon": True, "use_ssl": True},
   lines=True,
)

ddf["name"].nunique().compute()//]]>

This query takes 233 seconds to execute.  It just so happens that we also have this same exact dataset persisted in S3 with the Parquet file format.  Let’s run this same query on the Parquet dataset and see how long it takes to execute.

ddf = dd.read_parquet(
   "s3://coiled-datasets/timeseries/20-years/parquet",
   storage_options={"anon": True, "use_ssl": True},
   engine="pyarrow",
   columns=["name"],
)

ddf["name"].nunique().compute()//]]>

This query only takes 27 seconds to run.  As you can see, Parquet is significantly faster.  See this blog post to learn more about Parquet column pruning and other Parquet efficiencies that allow for this performance optimization.

Conclusion

Dask makes it easy to write out DataFrames to JSON files.

You should try to avoid JSON files when you’re persisting large datasets to disk in general.  File formats like Parquet that are optimized for large datasets are generally easier to work with.

When you have a small dataset or are building a system for a reader that expects JSON data, Dask makes it easy for you to write JSON files.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud