Reading and Writing Dask DataFrames and Arrays to HDF5

This blog post explains how to write Dask DataFrames to HDF5 files with to_hdf and how to write Dask Arrays to HDF5 files with to_hdf5.

It also explains how to read HDF files into Dask DataFrames with read_hdf.

HDF stands for Hierarchical Data Formats.  HDF files are still used, but they’re losing popularity as users shift to more modern file formats.  See here for more detailed information on the HDF file format.

HDF files suffer from some limitations that make them difficult to work with on cloud based storage systems.  Most modern applications are better off using Parquet for tabular data and Zarr for array data.  More specifically:

  • HDF & Dask DataFrames are rarely today and Parquet is common
  • HDF & Dask Arrays are used when data is stored on disk, like in a HPC environment.  Zarr is more commonly used when array data is stored in the cloud.
  • HDF is only really used for Dask Arrays with multidimensional data

Dask makes it easy to write to a single HDF file or multiple HDF files in parallel.  Dask also makes it easy to convert from HDF to another file format.

HDF files overview

HDF5 files have a hierarchical structure, similar to how files and folders are organized in a Unix filesystem.  Here’s a visual representation of some folders and files in a Unix filesystem.

//animals/
 pets/
   fido.png
   names.csv//]]>

Unix filesystems have directories (aka folders) and files.

HDF5 files are similar, but they have groups (instead of directories) and datasets (instead of files).  Here’s how data can be stored in a HDF5 file.

//Group1
 Dataset 1
 Dataset 2
Group 2
 Dataset 3//]]>

The hierarchical nature of HDF5 files allows for some cool customizations.  For example, you can read the data from one group and ignore the others.

Dask to_hdf: Library Dependencies

You need to install the optional pytables dependency to create HDF files with Dask.  You can install pytables with this command.

//conda install pytables//]]>

Once you have pytables installed, you’re ready to create HDF files on your local machine.

Dask DataFrame to_hdf and read_hdf: Simple Example

Create a small Dask DataFrame with two partitions, so we have some data that can be written to disk.

//import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, 2, 3, 4]})

ddf = dd.from_pandas(df, npartitions=2)//]]>

Now write the data to disk in two separate files.

//ddf.to_hdf("first-hdf/output-*.hdf", "/data1")//]]>

Here are the files that are written to disk.

//first-hdf/
 output-0.hdf
 output-1.hdf//]]>

The data in each of the HDF5 files is stored in the “data1” group.

You can also write the Dask DataFrame partitions to a single HDF file.

Here’s how to write the data to a single HDF5 file in the “cool” group:

//df.to_hdf("my-output.hdf", "/cool", format="table")//]]>

Here’s how to read my-output.hdf into a Dask DataFrame.

//dd.read_hdf("my-output.hdf", "/cool").compute()

col1  col2
0    a     1
1    b     2
2    c     3
3    d     4//]]>

You can also append data to my-output.hdf.  Let’s create another DataFrame and append it to the “nice” group.

//df = pd.DataFrame({"col1": ["x", "y"], "col2": [88, 99]})
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_hdf("my-output.hdf", "/nice", format="table", append=True)//]]>

The my-output.hdf file now contains data in the “cool” and “nice” groups.

Read all the data from the “nice” group into a Dask DataFrame:

//dd.read_hdf("my-output.hdf", "/nice").compute()

col1  col2
0    x    88
1    y    99//]]>

Now read all of the data from the “cool” group into a Dask DataFrame:

//dd.read_hdf("my-output.hdf", "/cool").compute()

col1  col2
0    a     1
1    b     2
2    c     3
3    d     4//]]>

You can also read the data from all the groups into a Dask DataFrame.

//dd.read_hdf("my-output.hdf", "/*").compute()

col1  col21    b     2
0    a     1

2    c     3
3    d     4
0    x    88
1    y    99//]]>

The highly customizable nature of HDF5 files are a blessing and a curse.  Customizations allow for specialized use cases, but they make it harder for downstream readers that may not know the file structure conventions.

Execute queries on larger datasets

Let’s create a Dask DataFrame with 189 million rows of data.

//ddf = dask.datasets.timeseries(
   start="2015-01-01",
   end="2020-12-31",
   freq="1s",
   partition_freq="7d",
   seed=42,
)//]]>

Here are the first five rows of the DataFrame.

Write out this dataset to HDF files in the data1 group.

//ddf.to_hdf("hdf/output-*.hdf", "/data1")//]]>

This DataFrame has 313 partitions, so it’s written to 313 separate HDF files.

Now run an analytical query to compute the number of unique values in the name column.

//ddf["name"].nunique().compute()//]]>

That query takes 72 seconds to run on a computer with 8 GB of RAM and 4 cores.

Let’s run the same query on the same dataset stored in snappy compressed Parquet files to compare performance.  Start by writing the dataset to Parquet files.

//ddf.to_parquet("parquet", engine="pyarrow", compression="snappy")//]]>

Let’s run the same query to compute the number of unique values in the name column.

//ddf = dd.read_parquet("parquet", engine="pyarrow", columns=["name"])
ddf["name"].nunique().compute()//]]>

This query executes in 10.5 seconds.  Parquet is a lot faster for this particular query.  See the Advantages of Parquet blog post for more information about why Parquet allows for fast analytical queries on data stored in files.

Dask Array: to_hdf5

You need to install the h5py library to write Dask Arrays out to HDF5 files.  You can install this dependency with conda install h5py.

Create a Dask Array, so we have some data that can be written to a HDF5 file.

//import dask.array as da

arr = da.random.random((10, 2))//]]>

Visually inspect the contents of the array.

//arr.compute()

array([[0.23100387, 0.48254162],
      [0.99983175, 0.84552979],
      [0.28108483, 0.66065226],
      [0.93905801, 0.76594799],
      [0.62132557, 0.30761242],
      [0.98902505, 0.43684951],
      [0.33143127, 0.13451706],
      [0.72742902, 0.86381888],
      [0.92025059, 0.38922756],
      [0.82958047, 0.05398304]])//]]>

Now write the array to a HDF5 file.

//arr.to_hdf5("myfile.hdf5", "/x")//]]>

This outputs the array data in the myfile.hdf5 file.

Dask read HDF5 into array with h5py

Read the HDF5 data into a Dask Array.

//import h5py

file = h5py.File("myfile.hdf5")

arr = da.from_array(file["x"])//]]>

Visually inspect the contents of arr.

//arr.compute()

array([[0.23100387, 0.48254162],
      [0.99983175, 0.84552979],
      [0.28108483, 0.66065226],
      [0.93905801, 0.76594799],
      [0.62132557, 0.30761242],
      [0.98902505, 0.43684951],
      [0.33143127, 0.13451706],
      [0.72742902, 0.86381888],
      [0.92025059, 0.38922756],
      [0.82958047, 0.05398304]])//]]>

Confirm that arr is a Dask Array.

//type(arr) # dask.array.core.Array//]]>

In practice, HDF5 is used with multidimensional data so this example does not demonstrate a representative dataset.  Using an overly simplistic dataset is fine here because we’re just trying to show the syntax for reading HDF5 files.

Conclusion

Dask makes it easy to read and write HDF files, either to a single file or to multiple files in parallel.

Dask is more scalable than technologies like pandas or NumPy because it uses parallel computing and supports distributed computations and lazy execution.  Pandas and NumPy require entire datasets to be loaded into memory, so they can’t handle analyses when the dataset is bigger than the memory of a single computer.  You can use Dask, even when the dataset is better than the amount of memory.

If you’re working with a large amount of HDF5 data, then Dask is a better option compared to pandas and NumPy.

HDF5 used to be a popular option for pandas users that were struggling with out of memory exceptions when running their analyses on large datasets.  They were sometimes able to solve their memory issues with HDF5.

HDF5 used to be the only well supported binary file format, but most data engines support Parquet / Zarr now, which are usually better options.  There are additional complexities when working with HDF5 when compared to Parquet / Zarr, so it’s mostly avoided when possible now.

HDF5 is difficult to work with largely because it's a monolithic file format (i.e. all the data is stored in one file), compared to Zarr and Parquet which partition data into multiple files.

This makes it difficult for multiple threads/process/nodes to write to an HDF5 file at once because they all have to contend with a single lock, as discussed here.  It's not even possible to read from multiple threads at once with the standard HDF5 client.

Dask is a great way to read HDF5 data and work with large HDF5 datasets, but it’s usually best avoided.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud