This blog post explains how to create Dask Arrays from Zarr files using the <pre class="language-python inline-code"><code class="language-python hljs">from_zarr</code></pre> function. The <pre class="language-python inline-code"><code class="language-python hljs">from_zarr</code></pre> function reads the data into a Dask array, which allows the user to work with arrays that are larger than memory using all of the available cores.
Here’s how to read a small Zarr file from the cloud into a Dask Array:
From the snippet above we can see that the Dask array contains 100x100x5 elements (float64) spread across 500 chunks, each with a shape of (10,10,1).
The Zarr file contains a chunked, compressed N-dimensional array (or a group of such arrays), with additional optimizations for efficient processing and support for the cloud. The appropriate Dask collection for working with arrays is Dask Array. Dask Array initially loads only the metadata to get information on the shape and content of the array stored in the Zarr file. Based on this metadata, Dask will form small ‘chunks’, subsets of the original array, that can be processed individually for greater parallelism and smaller resource usage.
This lazy version of the full Zarr array allows for both faster speed of developing the downstream code and faster execution of the eventual code. Dask Array API implements a subset of the NumPy ndarray interface, and much of the downstream code should be interchangeable with plain NumPy code.
The array inside the Zarr file is stored in chunked format, however the default chunk size might not be optimal for downstream computations. Dask can create the array with the desired chunk shape via “chunks” kwarg. This keyword argument accepts different value types:
For example, the following calculation would require execution of 376 tasks using the default chunks:
On the other hand, since we know that multiplication of matrices A and B requires rows of A and columns of B, we can specify the chunks of A to include rows, and conveniently for this example transpose of row chunks will give us the column chunks of matrix B. By specifying the chunks to be (2,10), we reduce the number of tasks to 61:
Note that in general, specifying custom chunks entails the fixed cost of rechunking (adjusting the data from the original chunks to the desired chunks), which can be expensive. Also, the tasks after rechunking should not be too resource-intensive to avoid excessive workload on the dask workers. Hence, a careful examination of the downstream code is needed to infer the optimal chunks. If performance is not of primary importance, then it’s best to let Dask determine the chunk size from the Zarr file specification.
A Zarr file can contain multiple arrays, organized in groups. In such cases, Dask will need to know the path to the array of interest. For example, if file
<pre class="language-python inline-code"><code class="language-python hljs">group.zarr</code></pre> contains group <pre class="language-python inline-code"><code class="language-python hljs">sample_group</code></pre> and inside this group there is an array <pre class="language-python inline-code"><code class="language-python hljs">sample_array</code></pre>, then creating Dask array will require passing <pre class="language-python inline-code"><code class="language-python hljs">component=”sample_group/sample_array”</code></pre> kwarg:
Note: this is an advanced topic and should in general require attention only if default performance requires improvement.
By design, calculations with Dask arrays are lazy until explicit evaluation is requested. The desired calculation can become complex with multiple layers of computations needed. Dask has a default strategy for ordering these calculations. This order does not matter if the computing power allows for simultaneous processing of all parallel tasks. However, if the resources are constrained, as is typically the case, then the order in which tasks are executed will matter.
Dask stores tasks in a task graph, which can be inspected via the <pre class="language-python inline-code"><code class="language-python hljs">.dask</code></pre>
property of a Dask array. Let’s create a sample Dask array from Zarr file and inspect it’s task graph:
Note that at a high-level the task graph contains two layers, the first layer is to read the metadata from the Zarr file and the second layer contains instructions to load individual chunks. This is optimal for most use cases, however it introduces a common dependency between tasks that operate on the individual chunks. This dependency can alter the order in which downstream tasks are executed.
To avoid such dependencies, one can add the keyword argument <pre class="language-python inline-code"><code class="language-python hljs">inline_array=True</code></pre>, which embeds the information on the source Zarr file inside the instructions to process each chunk. This means that the resulting high-level task graph contains just one layer:
By removing this dependency, the order in which downstream tasks are executed can be altered. A more detailed example is provided here, and as noted in the docs:
The right choice for <pre class="language-python inline-code"><code class="language-python hljs">inline_array</code></pre> depends on several factors, including the size of <pre class="language-python inline-code"><code class="language-python hljs">x</code></pre>, how expensive it is to create, which scheduler you’re using, and the pattern of downstream computations. As a heuristic, pre class="language-python inline-code"><code class="language-python hljs">inline_array=True</code></pre> may be the right choice when the array pre class="language-python inline-code"><code class="language-python hljs">x</code></pre> is cheap to serialize and deserialize (since it’s included in the graph many times) and if you’re experiencing ordering issues (see Ordering for more).
Let’s look at a practical example of working with a large-scale Zarr array in the cloud using Coiled. This particular dataset contains simulated weather data (temperature) from dask-tutorial (see https://tutorial.dask.org/03_array.html). In this example, we are interested in calculating the average observed temperature for a particular month.
To perform this computation, we will load the Zarr file, identify the available arrays and stack them into a single Dask array, which can then be used for efficient computations.
The examples in this blog post show how to read information from Zarr files into Dask arrays. The advantage of using Dask for working with arrays is that the available resources are used efficiently to allow parallel computations on data that exceeds the available memory. By using Coiled for cloud-based computations we accelerated the analysis, since the data remained on the cloud and Dask workers could process individual chunks of the Zarr file in parallel.