Parallel Computing with Dask#

Xarray integrates with Dask, a general purpose library for parallel computing, to handle larger-than-memory computations.

If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask:

import xarray as xr

ds = xr.open_zarr("/path/to/data.zarr")
timeseries = ds["temp"].mean(dim=["x", "y"]).compute()  # Compute result

Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you usually don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes.

Examples#

If you’re new to using Xarray with Dask, we recommend the Xarray + Dask Tutorial.

Here are some examples for using Xarray with Dask at scale:

Find more examples at the Project Pythia cookbook gallery.

Using Dask with Xarray#

A Dask array

Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the Dask Array docs on chunks). Commonly chunks are set when reading data, but you can also set the chunksize manually at any point in your workflow using Dataset.chunk() and DataArray.chunk(). See Chunking and performance for more.

Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph.

When a result is requested (e.g., for plotting, writing to disk, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently.

Nearly all Xarray methods have been extended to work automatically with Dask Arrays. This includes things like indexing, concatenating, rechunking, grouped operations, etc. Common operations are covered in more detail in each of the sections below.

Reading and writing data#

When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the chunks argument. Specifying chunks="auto" will set the dask chunk sizes to be a multiple of the on-disk chunk sizes. This can be a good idea, but usually the appropriate dask chunk size will depend on your workflow.

The Zarr format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. You can also use Zarr to read/write directly from cloud storage buckets (see the Dask documentation on connecting to remote data)

When you open a Zarr dataset with open_zarr(), it is loaded as a Dask array by default (if Dask is installed):

ds = xr.open_zarr("path/to/directory.zarr")

See Zarr for more details.

Open a single netCDF file with open_dataset() and supplying a chunks argument:

ds = xr.open_dataset("example-data.nc", chunks={"time": 10})

Or open multiple files in parallel with py:func:~xarray.open_mfdataset:

xr.open_mfdataset('my/files/*.nc', parallel=True)

Tip

When reading in many netCDF files with py:func:~xarray.open_mfdataset, using engine="h5netcdf" can be faster than the default which uses the netCDF4 package.

Save larger-than-memory netCDF files:

ds.to_netcdf("my-big-file.nc")

Or set compute=False to return a dask.delayed object that can be computed later:

delayed_write = ds.to_netcdf("my-big-file.nc", compute=False)
delayed_write.compute()

Note

When using Dask’s distributed scheduler to write NETCDF4 files, it may be necessary to set the environment variable HDF5_USE_FILE_LOCKING=FALSE to avoid competing locks within the HDF5 SWMR file locking scheme. Note that writing netCDF files with Dask’s distributed scheduler is only supported for the netcdf4 backend.

See netCDF for more details.

Open HDF5 files with open_dataset():

xr.open_dataset("/path/to/my/file.h5", chunks='auto')

See HDF5 for more details.

Open large geoTIFF files with rioxarray:

xds = rioxarray.open_rasterio("my-satellite-image.tif", chunks='auto')

See Rasterio for more details.

Loading Dask Arrays#

There are a few common cases where you may want to convert lazy Dask arrays into eager, in-memory Xarray data structures:

  • You want to inspect smaller intermediate results when working interactively or debugging

  • You’ve reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory

  • You need to compute intermediate results since Dask is unable (or struggles) to perform a certain computation. The canonical example of this is normalizing a dataset, e.g., ds - ds.mean(), when ds is larger than memory. Typically, you should either save ds to disk or compute ds.mean() eagerly.

To do this, you can use Dataset.compute() or DataArray.compute():

In [1]: ds.compute()
Out[1]: 
<xarray.Dataset> Size: 8MB
Dimensions:      (time: 30, latitude: 180, longitude: 180)
Coordinates:
  * time         (time) datetime64[ns] 240B 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude    (longitude) int64 1kB 0 1 2 3 4 5 6 ... 174 175 176 177 178 179
  * latitude     (latitude) float64 1kB 89.5 88.5 87.5 ... -87.5 -88.5 -89.5
Data variables:
    temperature  (time, latitude, longitude) float64 8MB 0.4691 ... -0.2467

Note

Using Dataset.compute() is preferred to Dataset.load(), which changes the results in-place.

You can also access DataArray.values, which will always be a NumPy array:

In [2]: ds.temperature.values
Out[2]: 
array([[[  4.691e-01,  -2.829e-01, ...,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00, ...,   8.726e-01,  -1.538e+00],
        ...
# truncated for brevity

NumPy ufuncs like numpy.sin() transparently work on all xarray objects, including those that store lazy Dask arrays:

In [3]: import numpy as np

In [4]: np.sin(ds)
Out[4]: 
<xarray.Dataset> Size: 8MB
Dimensions:      (time: 30, latitude: 180, longitude: 180)
Coordinates:
  * time         (time) datetime64[ns] 240B 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude    (longitude) int64 1kB 0 1 2 3 4 5 6 ... 174 175 176 177 178 179
  * latitude     (latitude) float64 1kB 89.5 88.5 87.5 ... -87.5 -88.5 -89.5
Data variables:
    temperature  (time, latitude, longitude) float64 8MB 0.4521 ... -0.2442

To access Dask arrays directly, use the DataArray.data attribute which exposes the DataArray’s underlying array type.

If you’re using a Dask cluster, you can also use Dataset.persist() for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It’s a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data.

ds = ds.persist()

Tip

Remember to save the dataset returned by persist! This is a common mistake.

Chunking and performance#

The way a dataset is chunked can be critical to performance when working with large datasets. You’ll want chunk sizes large enough to reduce the number of chunks that Dask has to think about (to reduce overhead from the task graph) but also small enough so that many of them can fit in memory at once.

Tip

A good rule of thumb is to create arrays with a minimum chunk size of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), you may need larger chunks. See Choosing good chunk sizes in Dask.

It can be helpful to choose chunk sizes based on your downstream analyses and to chunk as early as possible. Datasets with smaller chunks along the time axis, for example, can make time domain problems easier to parallelize since Dask can perform the same operation on each time chunk. If you’re working with a large dataset with chunks that make downstream analyses challenging, you may need to rechunk your data. This is an expensive operation though, so is only recommended when needed.

You can chunk or rechunk a dataset by:

  • Specifying the chunks kwarg when reading in your dataset. If you know you’ll want to do some spatial subsetting, for example, you could use chunks={'latitude': 10, 'longitude': 10} to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth.

  • Many array file formats are chunked on disk. You can specify chunks={} to have a single dask chunk map to a single on-disk chunk, and chunks="auto" to have a single dask chunk be a automatically chosen multiple of the on-disk chunks.

  • Using Dataset.chunk() after you’ve already read in your dataset. For time domain problems, for example, you can use ds.chunk(time=TimeResampler()) to rechunk according to a specified unit of time. ds.chunk(time=TimeResampler("MS")), for example, will set the chunks so that a month of data is contained in one chunk.

For large-scale rechunking tasks (e.g., converting a simulation dataset stored with chunking only along time to a dataset with chunking only across space), consider writing another copy of your data on disk and/or using dedicated tools such as Rechunker.

Parallelize custom functions with apply_ufunc and map_blocks#

Almost all of Xarray’s built-in operations work on Dask arrays. If you want to use a function that isn’t wrapped by Xarray, and have it applied in parallel on each block of your xarray object, you have three options:

  1. Use apply_ufunc() to apply functions that consume and return NumPy arrays.

  2. Use map_blocks(), Dataset.map_blocks() or DataArray.map_blocks() to apply functions that consume and return xarray objects.

  3. Extract Dask Arrays from xarray objects with DataArray.data and use Dask directly.

Tip

See the extensive Xarray tutorial on apply_ufunc.

apply_ufunc#

apply_ufunc() automates embarrassingly parallel “map” type operations where a function written for processing NumPy arrays should be repeatedly applied to Xarray objects containing Dask Arrays. It works similarly to dask.array.map_blocks() and dask.array.blockwise(), but without requiring an intermediate layer of abstraction. See the Dask documentation for more details.

For the best performance when using Dask’s multi-threaded scheduler, wrap a function that already releases the global interpreter lock, which fortunately already includes most NumPy and Scipy functions. Here we show an example using NumPy operations and a fast function from bottleneck, which we use to calculate Spearman’s rank-correlation coefficient:

import numpy as np
import xarray as xr
import bottleneck


def covariance_gufunc(x, y):
    return (
        (x - x.mean(axis=-1, keepdims=True)) * (y - y.mean(axis=-1, keepdims=True))
    ).mean(axis=-1)


def pearson_correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))


def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return pearson_correlation_gufunc(x_ranks, y_ranks)


def spearman_correlation(x, y, dim):
    return xr.apply_ufunc(
        spearman_correlation_gufunc,
        x,
        y,
        input_core_dims=[[dim], [dim]],
        dask="parallelized",
        output_dtypes=[float],
    )

The only aspect of this example that is different from standard usage of apply_ufunc() is that we needed to supply the output_dtypes arguments. (Read up on Wrapping custom computation for an explanation of the “core dimensions” listed in input_core_dims.)

Our new spearman_correlation() function achieves near linear speedup when run on large arrays across the four cores on my laptop. It would also work as a streaming operation, when run on arrays loaded from disk:

In [5]: rs = np.random.RandomState(0)

In [6]: array1 = xr.DataArray(rs.randn(1000, 100000), dims=["place", "time"])  # 800MB

In [7]: array2 = array1 + 0.5 * rs.randn(1000, 100000)

# using one core, on NumPy arrays
In [8]: %time _ = spearman_correlation(array1, array2, 'time')
CPU times: user 21.6 s, sys: 2.84 s, total: 24.5 s
Wall time: 24.9 s

In [9]: chunked1 = array1.chunk({"place": 10})

In [10]: chunked2 = array2.chunk({"place": 10})

# using all my laptop's cores, with Dask
In [11]: r = spearman_correlation(chunked1, chunked2, "time").compute()

In [12]: %time _ = r.compute()
CPU times: user 30.9 s, sys: 1.74 s, total: 32.6 s
Wall time: 4.59 s

One limitation of apply_ufunc() is that it cannot be applied to arrays with multiple chunks along a core dimension:

In [13]: spearman_correlation(chunked1, chunked2, "place")
ValueError: dimension 'place' on 0th function argument to apply_ufunc with
dask='parallelized' consists of multiple chunks, but is also a core
dimension. To fix, rechunk into a single Dask array chunk along this
dimension, i.e., ``.rechunk({'place': -1})``, but beware that this may
significantly increase memory usage.

This reflects the nature of core dimensions, in contrast to broadcast (non-core) dimensions that allow operations to be split into arbitrary chunks for application.

Tip

When possible, it’s recommended to use pre-existing dask.array functions, either with existing xarray methods or apply_ufunc() with dask='allowed'. Dask can often have a more efficient implementation that makes use of the specialized structure of a problem, unlike the generic speedups offered by dask='parallelized'.

map_blocks#

Functions that consume and return Xarray objects can be easily applied in parallel using map_blocks(). Your function will receive an Xarray Dataset or DataArray subset to one chunk along each chunked dimension.

In [14]: ds.temperature
Out[14]: 
<xarray.DataArray 'temperature' (time: 30, latitude: 180, longitude: 180)> Size: 8MB
array([[[ 4.691e-01, -2.829e-01, -1.509e+00, ..., -8.584e-01,  3.070e-01, -2.867e-02],
        [ 3.843e-01,  1.574e+00,  1.589e+00, ..., -7.983e-01, -5.577e-01,  3.814e-01],
        [ 1.337e+00, -1.531e+00,  1.331e+00, ...,  1.116e+00, -4.439e-01, -2.005e-01],
        ...,
        [-9.767e-01,  1.153e+00,  2.368e-01, ..., -2.358e+00,  7.173e-01, -3.645e-01],
        [-4.878e-01,  3.862e-01,  6.693e-02, ...,  1.047e+00,  1.519e+00,  1.234e+00],
        [ 8.305e-01,  3.307e-01, -7.652e-03, ...,  3.320e-01,  6.742e-01, -6.534e-01]],

       [[ 1.830e-01, -1.192e+00, -8.357e-02, ...,  3.961e-01,  2.800e-01, -7.277e-01],
        [ 2.057e+00, -1.163e+00,  2.464e-01, ...,  6.132e-01, -8.074e-01,  1.021e+00],
        [-4.868e-01,  7.992e-02,  9.419e-01, ..., -8.465e-01,  2.024e-01, -8.869e-01],
        ...,
        [ 1.137e-01, -6.739e-01, -6.066e-03, ..., -2.464e-01, -8.723e-01, -5.857e-01],
        [ 1.871e-01,  8.154e-02,  2.765e-01, ...,  3.445e-01, -1.893e-01, -2.018e-01],
        [ 8.611e-01,  1.849e+00,  9.102e-01, ..., -9.998e-01,  2.341e+00,  1.089e+00]],

       [[-9.274e-01,  1.177e+00,  5.248e-01, ...,  1.319e+00,  1.202e+00,  5.705e-02],
        [ 1.753e+00, -2.929e-01,  1.585e+00, ..., -3.762e-01,  4.622e-01,  1.767e+00],
        [ 9.798e-04, -1.524e-01, -9.568e-02, ..., -1.023e+00, -5.611e-01, -1.384e+00],
        ...,
...
        ...,
        [-2.026e-01,  6.837e-01,  5.485e-01, ...,  2.302e-02, -1.714e+00,  1.224e+00],
        [ 6.145e-01,  8.913e-01,  1.595e+00, ..., -1.572e+00,  2.206e+00,  1.441e-01],
        [ 8.968e-01, -1.058e-02,  1.375e+00, ...,  1.326e+00, -1.596e+00,  1.755e+00]],

       [[ 2.798e-01, -1.042e+00, -9.158e-01, ...,  1.105e+00, -4.097e-01,  6.651e-01],
        [-1.821e-01, -1.315e+00, -3.048e-01, ...,  1.280e-01, -1.776e+00, -1.208e+00],
        [-7.035e-01, -5.634e-01,  1.499e+00, ..., -2.037e-01, -1.552e-01, -1.001e+00],
        ...,
        [-4.242e-01, -3.481e-01, -4.598e-01, ..., -1.912e+00,  8.812e-01,  8.306e-01],
        [-1.766e+00, -7.101e-01, -3.624e-02, ...,  1.893e+00, -4.464e-02, -3.418e-01],
        [-4.511e-01, -9.322e-01,  5.886e-01, ...,  9.546e-01, -1.616e+00,  5.361e-01]],

       [[ 1.827e+00,  1.758e-01, -3.515e-01, ..., -5.171e-01, -3.210e-01,  3.487e-02],
        [ 1.296e+00, -9.241e-01, -4.683e-01, ..., -1.143e+00,  5.967e-01,  4.978e-01],
        [-2.550e-01,  1.545e+00, -1.851e+00, ...,  1.141e+00, -8.020e-01,  1.415e+00],
        ...,
        [ 9.623e-01, -1.391e+00,  7.754e-01, ...,  7.441e-01,  1.019e+00,  6.441e-02],
        [ 1.650e-02, -5.006e-01, -7.921e-01, ...,  2.091e+00, -8.625e-01,  7.333e-01],
        [-1.472e+00,  6.782e-01, -1.407e+00, ...,  8.119e-01, -2.485e-01, -2.467e-01]]])
Coordinates:
  * time       (time) datetime64[ns] 240B 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude  (longitude) int64 1kB 0 1 2 3 4 5 6 ... 174 175 176 177 178 179
  * latitude   (latitude) float64 1kB 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5

This DataArray has 3 chunks each with length 10 along the time dimension. At compute time, a function applied with map_blocks() will receive a DataArray corresponding to a single block of shape 10x180x180 (time x latitude x longitude) with values loaded. The following snippet illustrates how to check the shape of the object received by the applied function.

In [15]: def func(da):
   ....:     print(da.sizes)
   ....:     return da.time
   ....: 

In [16]: mapped = xr.map_blocks(func, ds.temperature)
Frozen({'time': 30, 'latitude': 180, 'longitude': 180})

In [17]: mapped
Out[17]: 
<xarray.DataArray 'time' (time: 30)> Size: 240B
array(['2015-01-01T00:00:00.000000000', '2015-01-02T00:00:00.000000000',
       '2015-01-03T00:00:00.000000000', '2015-01-04T00:00:00.000000000',
       '2015-01-05T00:00:00.000000000', '2015-01-06T00:00:00.000000000',
       '2015-01-07T00:00:00.000000000', '2015-01-08T00:00:00.000000000',
       '2015-01-09T00:00:00.000000000', '2015-01-10T00:00:00.000000000',
       '2015-01-11T00:00:00.000000000', '2015-01-12T00:00:00.000000000',
       '2015-01-13T00:00:00.000000000', '2015-01-14T00:00:00.000000000',
       '2015-01-15T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-18T00:00:00.000000000',
       '2015-01-19T00:00:00.000000000', '2015-01-20T00:00:00.000000000',
       '2015-01-21T00:00:00.000000000', '2015-01-22T00:00:00.000000000',
       '2015-01-23T00:00:00.000000000', '2015-01-24T00:00:00.000000000',
       '2015-01-25T00:00:00.000000000', '2015-01-26T00:00:00.000000000',
       '2015-01-27T00:00:00.000000000', '2015-01-28T00:00:00.000000000',
       '2015-01-29T00:00:00.000000000', '2015-01-30T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 240B 2015-01-01 2015-01-02 ... 2015-01-30

Notice that the map_blocks() call printed Frozen({'time': 0, 'latitude': 0, 'longitude': 0}) to screen. func is received 0-sized blocks! map_blocks() needs to know what the final result looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped inputs (automated inference). This works in many cases, but not all. If automatic inference does not work for your function, provide the template kwarg (see below).

In this case, automatic inference has worked so let’s check that the result is as expected.

In [18]: mapped.load(scheduler="single-threaded")
Out[18]: 
<xarray.DataArray 'time' (time: 30)> Size: 240B
array(['2015-01-01T00:00:00.000000000', '2015-01-02T00:00:00.000000000',
       '2015-01-03T00:00:00.000000000', '2015-01-04T00:00:00.000000000',
       '2015-01-05T00:00:00.000000000', '2015-01-06T00:00:00.000000000',
       '2015-01-07T00:00:00.000000000', '2015-01-08T00:00:00.000000000',
       '2015-01-09T00:00:00.000000000', '2015-01-10T00:00:00.000000000',
       '2015-01-11T00:00:00.000000000', '2015-01-12T00:00:00.000000000',
       '2015-01-13T00:00:00.000000000', '2015-01-14T00:00:00.000000000',
       '2015-01-15T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-18T00:00:00.000000000',
       '2015-01-19T00:00:00.000000000', '2015-01-20T00:00:00.000000000',
       '2015-01-21T00:00:00.000000000', '2015-01-22T00:00:00.000000000',
       '2015-01-23T00:00:00.000000000', '2015-01-24T00:00:00.000000000',
       '2015-01-25T00:00:00.000000000', '2015-01-26T00:00:00.000000000',
       '2015-01-27T00:00:00.000000000', '2015-01-28T00:00:00.000000000',
       '2015-01-29T00:00:00.000000000', '2015-01-30T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 240B 2015-01-01 2015-01-02 ... 2015-01-30

In [19]: mapped.identical(ds.time)
Out[19]: True

Note that we use .load(scheduler="single-threaded") to execute the computation. This executes the Dask graph in serial using a for loop, but allows for printing to screen and other debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and the returned result is identical to ds.time as expected.

Here is a common example where automated inference will not work.

In [20]: def func(da):
   ....:     print(da.sizes)
   ....:     return da.isel(time=[1])
   ....: 

In [21]: mapped = xr.map_blocks(func, ds.temperature)
Frozen({'time': 30, 'latitude': 180, 'longitude': 180})

func cannot be run on 0-shaped inputs because it is not possible to extract element 1 along a dimension of size 0. In this case we need to tell map_blocks() what the returned result looks like using the template kwarg. template must be an xarray Dataset or DataArray (depending on what the function returns) with dimensions, shapes, chunk sizes, attributes, coordinate variables and data variables that look exactly like the expected result. The variables should be dask-backed and hence not incur much memory cost.

Note

Note that when template is provided, attrs from template are copied over to the result. Any attrs set in func will be ignored.

In [22]: template = ds.temperature.isel(time=[1, 11, 21])

In [23]: mapped = xr.map_blocks(func, ds.temperature, template=template)
Frozen({'time': 30, 'latitude': 180, 'longitude': 180})

Notice that the 0-shaped sizes were not printed to screen. Since template has been provided map_blocks() does not need to infer it by running func on 0-shaped inputs.

In [24]: mapped.identical(template)
Out[24]: False

map_blocks() also allows passing args and kwargs down to the user function func. func will be executed as func(block_xarray, *args, **kwargs) so args must be a list and kwargs must be a dictionary.

In [25]: def func(obj, a, b=0):
   ....:     return obj + a + b
   ....: 

In [26]: mapped = ds.map_blocks(func, args=[10], kwargs={"b": 10})

In [27]: expected = ds + 10 + 10

In [28]: mapped.identical(expected)
Out[28]: True

Tip

As map_blocks() loads each block into memory, reduce as much as possible objects consumed by user functions. For example, drop useless variables before calling func with map_blocks().

Deploying Dask#

By default, Dask uses the multi-threaded scheduler, which distributes work across multiple cores on a single machine and allows for processing some datasets that do not fit into memory. However, this has two limitations:

  • You are limited by the size of your hard drive

  • Downloading data can be slow and expensive

Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the Dask deployment documentation for more details.

Best Practices#

Dask is pretty easy to use but there are some gotchas, many of which are under active development. Here are some tips we have found through experience. We also recommend checking out the Dask best practices.

  1. Do your spatial and temporal indexing (e.g. .sel() or .isel()) early, especially before calling resample() or groupby(). Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn’t been implemented in Dask yet. (See Dask issue #746).

  2. More generally, groupby() is a costly operation and will perform a lot better if the flox package is installed. See the flox documentation for more. By default Xarray will use flox if installed.

  3. Save intermediate results to disk as a netCDF files (using to_netcdf()) and then load them again with open_dataset() for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See Dask issue #874)

  4. Use the Dask dashboard to identify performance bottlenecks.

Here’s an example of a simplified workflow putting some of these tips together:

import xarray

ds = xr.open_zarr(  # Since we're doing a spatial reduction, increase chunk size in x, y
    "my-data.zarr", chunks={"x": 100, "y": 100}
)

time_subset = ds.sea_temperature.sel(
    time=slice("2020-01-01", "2020-12-31")  # Filter early
)

# faster resampling when flox is installed
daily = ds.resample(time="D").mean()

daily.load()  # Pull smaller results into memory after reducing the dataset