High-level datasets

xarray_beam.Dataset is new (as of September 2025) high-level interface for Xarray-Beam.

It requires less boilerplate code than the current (explicit) interface, and accordingly should be an easier to use tool, especially for non-expert users. You still need to think about how your data is divided into chunks, but the data model of Dataset keep track of the high-level structure of your data, avoiding the need to manually building templates in ChunksToZarr.

Warning

The Dataset interface is experimental, and currently offers no backwards compatibility guarantees.

Data model

Dataset is a wrapper over a series of Beam transformations, adding metadata describing the corresponding xarray.Dataset and how is distributed with Beam:

  • ptransform is the wrapped beam.PTransform to compute the chunks of the dataset.

  • template is a lazily-computed xarray.Dataset indicating the structure of the overall dataset.

  • chunks is dictionary mapping from dimension names to integer chunk sizes, indicating the size of each chunk.

  • split_vars is a boolean indicating if ptransform elements each contain only a single variable from the dataset, rather than all variables.

This information is surfaced via xbeam.Dataset.__repr__():

import apache_beam as beam
import xarray_beam as xbeam
import xarray
import numpy as np
import pandas as pd

xarray_ds = xarray.Dataset(
    {'temperature': (('time', 'longitude', 'latitude'), np.random.randn(365, 360, 180))},
    coords={'time': pd.date_range('2025-01-01', freq='1D', periods=365)},
)
chunks = {'time': '30MB', 'longitude': -1, 'latitude': -1}
xbeam_ds = xbeam.Dataset.from_xarray(xarray_ds, chunks)
xbeam_ds
<xarray_beam.Dataset>
PTransform: <DatasetToChunks(PTransform) label=[from_xarray_0]>
Chunks:     30MB (time: 57, longitude: 360, latitude: 180, split_vars=False)
Template:   189MB (7 chunks)
    Dimensions:      (time: 365, longitude: 360, latitude: 180)
    Coordinates:
      * time         (time) datetime64[ns] 3kB 2025-01-01 2025-01-02 ... 2025-12-31
    Dimensions without coordinates: longitude, latitude
    Data variables:
        temperature  (time, longitude, latitude) float64 189MB ...

Xarray-Beam pipelines typically read and write data to Zarr, so we’ll start by writing our example data to a Zarr file (with plain Xarray/Dask) for us to read later:

xarray_ds.chunk(chunks).to_zarr('example_data.zarr', mode='w')
<xarray.backends.zarr.ZarrStore at 0x7da06bc128e0>

Writing pipelines

Xarray-Beam has an intentionally small set of primitives. Most pipelines can be written via a handful of Dataset methods:

  • from_zarr(): Load a dataset from a Zarr store.

  • rechunk(): Adjust chunks on a dataset.

  • map_blocks(): Map a function over every chunk of this dataset independently.

  • mean(): Calculate a mean over one or more dimensions.

  • to_zarr(): Write a dataset to a Zarr store.

Aside from computing averages, all computation happens via some combination of rechunk and the embarrasingly parallel map_blocks method.

Chunking strategies

In order for map_blocks to work, data needs to be appropriately chunked. Here are a few typical chunking patterns that work well for most needs:

  • “Pencil” chunks, which group together all times, and parallelize over space. These long and skinny chunks look like a box of pencils.

  • “Pancake” chunks, which group together all spatial locations, and parallelize over time. These flat and wide chunks look like a stack of pancakes.

  • Intermediate “compromise” chunks, somewhere between these extremes. These optimize for worst-case behavior.

Pancake vs Pencil chunks

Weather/climate datasets are typically generated and stored in “pancake” chunks, but “pencil” chunks are more suitable for most analytics queries, which requires large histories of weather at small numbers of locations.

Using the right chunks is absolutely essentially for efficient operations with Xarray-Beam and Zarr. For example, reading data from a single location across all times (a “pencil” query) is extremely inefficient for a dataset stored in “pancake” chunks – it would require loading the entire dataset from disk!

Like Dask, Xarray-Beam works best with chunks that are 10s to 100s of MB in size, large enough to amortize Python overhead but small enough that chunks fit easily into memory. Smaller chunk sizes in Zarr stores (closer to 10MB) can be convenient to increase the flexiblity of chunking methods when reading the data from disk later

You can explicitly adjusts chunk sizes with rechunk(). The syntax is a mapping from dimension names (or ..., for all other dimensions) to chunk sizes, which can be any of -1 (to indicate “size of the full dimension”), a positive integer or a string indicating a target size in bytes like '100MB', e.g.,

  • chunks=-1: no chunking.

  • chunks='100 MB': chunk size of ~100 MB, without dimension-specific constraints.

  • chunks={'time': 100}: fixed size of 100 along time, preserving original chunks along other dimensions.

  • chunks={'time': -1, ...: '100 MB'}: pencil chunks of ~100 MB each.

  • chunks={'latitude': -1, 'longitude': -1, ...: '100 MB'}: pancake chunks of ~100 MB each.

See normalize_chunks() for full details.

Warning

Rechunking between very different schemes is fundamentally an expensive operation (it requires multiple complete reads/writes of a dataset from disk). If performance and flexibility are critical, it may be worth storing multiple copies of your data in different chunking formats.

Tip

Using Zarr v3’s sharding feature in to_zarr() to group smaller chunks (~1 MB) into shards can also help mitigate the challenges of picking an optimal chunk size.

Example 1: Climatology

Here we need to group together all time points in the same chunk (“pencil chunks”), parallelizing over space:

with beam.Pipeline() as pipeline:
  (
      xbeam.Dataset.from_zarr('example_data.zarr', pipeline=pipeline)
      .rechunk({'time': -1, ...: '100 MB'})
      .map_blocks(lambda ds: ds.groupby('time.month').mean())
      .rechunk('10 MB')  # ensure a reasonable min chunk-size for Zarr
      .to_zarr('example_climatology.zarr')
  )
xarray.open_zarr('example_climatology.zarr')
<xarray.Dataset> Size: 6MB
Dimensions:      (month: 12, longitude: 360, latitude: 180)
Coordinates:
  * month        (month) int64 96B 1 2 3 4 5 6 7 8 9 10 11 12
Dimensions without coordinates: longitude, latitude
Data variables:
    temperature  (month, longitude, latitude) float64 6MB dask.array<chunksize=(12, 360, 180), meta=np.ndarray>

Example 2: Regridding over space

Here we need to group all space points in the same chunk (“pancake chunks”), parallelizing over time:

with beam.Pipeline() as pipeline:
  (
      xbeam.Dataset.from_zarr('example_data.zarr', pipeline=pipeline)
      .rechunk({'time': '30MB', 'latitude': -1, 'longitude': -1})
      .map_blocks(lambda ds: ds.coarsen(latitude=2, longitude=2).mean())
      .to_zarr('example_regrid.zarr')
  )
xarray.open_zarr('example_regrid.zarr')
<xarray.Dataset> Size: 47MB
Dimensions:      (time: 365, longitude: 180, latitude: 90)
Coordinates:
  * time         (time) datetime64[ns] 3kB 2025-01-01 2025-01-02 ... 2025-12-31
Dimensions without coordinates: longitude, latitude
Data variables:
    temperature  (time, longitude, latitude) float64 47MB dask.array<chunksize=(57, 180, 90), meta=np.ndarray>

Limitations of map_blocks

In the examples above, map_blocks() somehow automatically knew the appropriate structure of the output template, without evaluating any chunked data. How could this work?

For building templates, Xarray-Beam relies on lazy evaluation with Dask arrays. This requires that applied functions are Dask compatible. Almost all built-in Xarray operations are Dask compatible, but if your applied function is not Dask compatible (e.g., because it loads array values into memory), Xarray-Beam will show an informative error:

try:
  (
      xbeam.Dataset.from_zarr('example_data.zarr')
      .map_blocks(lambda ds: ds.compute())  # load into memory
  )
except Exception as e:
  print(f'{type(e).__name__}: {e}')
ValueError: failed to lazily apply func() to the existing template. Consider supplying template explicitly or modifying func() to support lazy dask arrays.

You can avoid these errors by explicitly supplying a template, either from Dataset.template or produced by make_template():

template = xbeam.make_template(xarray_ds)
(
    xbeam.Dataset.from_zarr('example_data.zarr')
    .map_blocks(lambda ds: ds.compute(), template=template)
)
<xarray_beam.Dataset>
PTransform: <_ChainedPTransform(PTransform) label=[from_zarr_3|map_blocks_3]>
Chunks:     30MB (time: 57, longitude: 360, latitude: 180, split_vars=False)
Template:   189MB (7 chunks)
    Dimensions:      (time: 365, longitude: 360, latitude: 180)
    Coordinates:
      * time         (time) datetime64[ns] 3kB 2025-01-01 2025-01-02 ... 2025-12-31
    Dimensions without coordinates: longitude, latitude
    Data variables:
        temperature  (time, longitude, latitude) float64 189MB ...

Tip

Notice that supplying pipeline to from_zarr() is optional. You’ll need to eventually apply a Beam pipeline to a PTransform produced by Xarray-Beam to compute it, but it can be convenient to omit when building pipelines interactively.

Interfacing with low-level transforms

Dataset is a thin wrapper around Xarray-Beam transformations, so you can always drop into the lower-level Xarray-Beam data model and use raw Beam operations. This is especially useful for the reading or writing data.

To manually create a Dataset from a Beam ptransform, use from_ptransform(). Here’s an example, showing the common pattern of evaluating a single example in-memory to produce the xarray.Dataset required for building a template:

all_times = pd.date_range('2025-01-01', freq='1D', periods=365)
source_dataset = xarray.open_zarr('example_data.zarr', chunks=None)

def load_chunk(time: pd.Timestamp) -> tuple[xbeam.Key, xarray.Dataset]:
  key = xbeam.Key({'time': (time - all_times[0]).days})
  dataset = source_dataset.sel(time=[time])
  return key, dataset

ptransform = beam.Create(all_times) | beam.Map(load_chunk)

_, example = load_chunk(all_times[0])
template = xbeam.make_template(example)
template = xbeam.replace_template_dims(template, time=all_times)

ds_beam = xbeam.Dataset.from_ptransform(
    ptransform, template=template, chunks={'time': 1}, split_vars=False
)
ds_beam
<xarray_beam.Dataset>
PTransform: <_ChainedPTransform(PTransform) label=[Create|Map(load_chunk)|from_ptransform_0]>
Chunks:     518kB (time: 1, longitude: 360, latitude: 180, split_vars=False)
Template:   189MB (365 chunks)
    Dimensions:      (time: 365, longitude: 360, latitude: 180)
    Coordinates:
      * time         (time) datetime64[ns] 3kB 2025-01-01 2025-01-02 ... 2025-12-31
    Dimensions without coordinates: longitude, latitude
    Data variables:
        temperature  (time, longitude, latitude) float64 189MB ...

You can also pull-out the underlying Beam ptransform from a dataset to append new transformations, e.g., to write each element of the pipeline to disk as a separate file:

def to_netcdf(key: xbeam.Key, chunk: xarray.Dataset):
  path = f"{chunk.indexes['time'][0]:%Y-%m-%d}.nc"
  chunk.to_netcdf(path)

with beam.Pipeline() as p:
  p | ds_beam.rechunk('50MB').ptransform | beam.MapTuple(to_netcdf)

%ls *.nc
2025-01-01.nc  2025-04-07.nc  2025-07-12.nc  2025-10-16.nc

Missing features

xbeam.Dataset is not yet complete, and we would welcome contributions! Here are a few features that would be particularly welcome. If you’re interested in any of these, the easiest way to get in touch is to raise an issue on GitHub.

Operations that combine multiple datasets

Support for operations that merge together different datasets would be quite welcome, e.g., to evaluate model outputs against ground truth. Currently, tools like WeatherBenchX acheive this by writing custom Beam pipelines.

There are two ways these might be implemented for Dataset:

  1. By supporting multiple Dataset arguments in a xbeam.map_blocks() function (tracking issue).

  2. By supporting xarray.DataTree objects (tracking issue)

In the long term, DataTree support for simultaneously loading data is a better option, because merging together separate Beam ptransforms (as would be required for map_blocks) requires an expensive shuffle step via beam.CoGroupByKey. This will require the upstream Xarray project supporting a bit more functionality with DataTree, most notably concat and combine_nested.

Aggregations other than mean

Currently, Xarray-Beam only supports an efficient aggregation implementation for mean(), but it should be relatively straightforward to extend this for many other common Xarray aggregation, e.g., sum, min, max, all, any, std, var, etc.

IO connectors

Tools for reading/writing Xarray-Beam into other distributed storage systems, such as Google Earth Engine (see XEE) and Icechunk, would be very welcome.

Other Dataset operations

xbeam.Dataset has an intentionally small API surface, so features that can implemented via a trivial call to map_blocks() are probably not a good fit for Xarray-Beam itself.

That said, there are plenty of other Xarray methods that do require updates to underlying chunks and xbeam.Key objects beyond what map_blocks can handle (e.g., rename, thin, assign_coords), or for which more efficient distributed algorithms exist (e.g., for groupby and resampling).

We are also contemplating starting another open source project for collecting generally useful utilities that a little too weather/climate domain-specific to make sense in Xarray-Beam, e.g., for regridding.