Reading and writing data

Read datasets into chunks

There are two main options for loading an xarray.Dataset into Xarray-Beam. You can either create the dataset from scratch or use the DatasetToChunks transform starting at the root of a Beam pipeline:

import apache_beam as beam
import numpy as np
import pandas as pd
import xarray_beam as xbeam
import xarray
ds = xarray.tutorial.load_dataset('air_temperature')
with beam.Pipeline() as p:
    p | xbeam.DatasetToChunks(ds, chunks={'time': 1000}) | beam.MapTuple(lambda k, v: print(k, type(v)))
Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) <class 'xarray.core.dataset.Dataset'>

Importantly, xarray datasets fed into DatasetToChunks can be lazy, with data not already loaded eagerly into NumPy arrays. When you feed lazy datasets into DatasetToChunks, each individual chunk will be indexed and evaluated separately on Beam workers.

This pattern allows for leveraging Xarray’s builtin dataset loaders (e.g., open_dataset() and open_zarr()) for feeding arbitrarily large datasets into Xarray-Beam.

Reading data from Zarr

Zarr is the preferred file format for reading and writing data with Xarray-Beam, due to its excellent scalability and support inside Xarray.

The easiest way to get good performance from Zarr into Xarray-Beam is to use xarray_beam.open_zarr(). This function returns a pair of values:

  1. A lazily indexed xarray.Dataset corresponding to the Zarr store, but not using Dask. This is exactly what you would get from xarray.open_zarr with chunks=None.

  2. A dictionary mapping from dimension names to integer chunk sizes. Obtaining this information without using Dask to chunk the array requires looking at Xarray’s encoding dictionaries or directly inspecting the Zarr store.

# write data into the distributed Zarr format
ds.chunk({'time': 1000}).to_zarr('example-data.zarr', mode='w')

# read it using xarray-beam's utilities
ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')
print(ds_on_disk)
print(chunks)
<xarray.Dataset>
Dimensions:  (time: 2920, lat: 25, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 ...
Attributes:
    Conventions:  COARDS
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
    title:        4x daily NMC reanalysis (1948)
{'time': 1000, 'lat': 25, 'lon': 53}

Conveniently, this is exactly the information you need for feeding into DatasetToChunks to write an Xarray-Beam pipeline:

with beam.Pipeline() as p:
    p | xbeam.DatasetToChunks(ds_on_disk, chunks) | beam.MapTuple(lambda k, v: print(k, type(v)))
Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) <class 'xarray.core.dataset.Dataset'>

Writing data to Zarr

ChunksToZarr is Xarray-Beam’s API for saving chunks into a Zarr store.

For small datasets where you aren’t concerned about extra overhead for writing data, you can get started just using it directly:

with beam.Pipeline() as p:
    p | xbeam.DatasetToChunks(ds_on_disk, chunks) | xbeam.ChunksToZarr('example-data-v2.zarr')

The resulting Zarr dataset will have array chunks matching the in-memory chunks from your Beam PCollection. If you want different chunks on disk, consider inserting transforms to rechunk before exporting to Zarr.

For larger datasets, read on – you’ll want to use a template.

Creating templates

By default, ChunksToZarr needs to evaluate and combine the entire distributed dataset in order to determine overall Zarr metadata (e.g., array names, shapes, dtypes and attributes). This is fine for relatively small datasets, but can entail significant additional communication and storage costs for large datasets.

The optional template argument allows for prespecifying structure of the full on disk dataset in the form of another lazy xarray.Dataset. Lazy templates specify the structure of the array data that will be written by the PTransform. Array values that may be written as part of the Beam pipeline are indicated by using lazily computed Dask arrays to store the data.

The easiest way to make a template is with xarray_beam.make_template() helper, which transforms a dataset into another dataset where every value is lazy:

ds = xarray.open_zarr('example-data.zarr', chunks=None)
template = xbeam.make_template(ds)
print(template)
<xarray.Dataset>
Dimensions:  (time: 2920, lat: 25, lon: 53)
Coordinates:
  * lat      (lat) float32 75.0 72.5 70.0 67.5 65.0 ... 25.0 22.5 20.0 17.5 15.0
  * lon      (lon) float32 200.0 202.5 205.0 207.5 ... 322.5 325.0 327.5 330.0
  * time     (time) datetime64[ns] 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
    air      (time, lat, lon) float32 dask.array<chunksize=(2920, 25, 53), meta=np.ndarray>
Attributes:
    Conventions:  COARDS
    description:  Data is from NMC initialized reanalysis\n(4x/day).  These a...
    platform:     Model
    references:   http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
    title:        4x daily NMC reanalysis (1948)

Tip

Under the covers, make_template() has a very simple implementation, equivalent to xarray.zeros_like(ds.chunk(-1)).

“Template” datasets are not only useful for expressing the desired structures of Zarr stores, but also because every builtin Xarray operation is entirely lazy on Datasets consistenting of Dask arrays. This makes it relatively straightforward to build up a new Dataset with the required variables and dimension, e.g.,

# remove the "time" dimension, and insert a new "sample" dimension
new_template = template.isel(time=0, drop=True).expand_dims(sample=np.arange(10))
# setup a template for spatially regridding along latitude and longitude
new_longitudes = np.linspace(0, 100, num=8)
new_latitudes = np.linspace(30, 80, num=7)
new_template = template.head(lat=7, lon=8).assign_coords(lat=new_latitudes, lon=new_longitudes)

End to end examples

If you supply a template, you should also supply the zarr_chunks argument in order to ensure that the data ends up appropriately chunked in the Zarr store. A complete example of reading and writing data from a Zarr store typically looks something like:

ds_on_disk, chunks = xbeam.open_zarr('example-data.zarr')

template = xbeam.make_template(ds_on_disk)

with beam.Pipeline() as p:
    (
        p
        | xbeam.DatasetToChunks(ds_on_disk, chunks)
        # insert additional transforms here
        | xbeam.ChunksToZarr('example-data-v3.zarr', template, chunks)
    )

If you don’t have an existing Dataset to start with, a common pattern is to reuse the same function you’ll use to load data for each chunk, e.g.,

all_days = pd.date_range('2013-01-01', '2014-01-01', freq='1D')

def load_one_example(time: pd.Timestamp) -> tuple[xbeam.Key, xarray.Dataset]:
    key = xbeam.Key({'time': (time - all_days[0]).days})
    dataset = ds.sel(time=[time])  # replace with your code to create one example
    return key, dataset

_, example = load_one_example(all_days[0])
template = xbeam.make_template(example).squeeze('time', drop=True).expand_dims(time=all_days)
zarr_chunks = {'time': 100}  # desired chunking along "time", e.g., for more efficient storage in Zarr

with beam.Pipeline() as p:
    (
        p
        | beam.Create(all_days)
        | beam.Map(load_one_example)
        | xbeam.ConsolidateChunks(zarr_chunks)
        | xbeam.ChunksToZarr('example-data-v4.zarr', template, zarr_chunks)
    )

For more examples of how to manipulate templates and read/write data with Zarr, see the end-to-end ERA5 climatology and ERA5 rechunk examples.

Tips for custom data loaders

If you use Xarray’s file opening utilities instead of xarray_beam.open_zarr, you need to take some care to get good performance when processing very large numbers of chunks (hundreds of thousands).

The main tip is to set chunks=None when opening datasets and then explicitly provide chunks in DatasetToChunks – exactly the pattern facilitated by xarray_beam.open_zarr.

chunks=None tells Xarray to use its builtin lazy indexing machinery, instead of using Dask. This is advantageous because datasets using Xarray’s lazy indexing are serialized much more compactly (via pickle) when passed into Beam transforms.

Alternatively, you can pass in lazy datasets using dask. In this case, you don’t need to explicitly supply chunks to DatasetToChunks:

on_disk = xarray.open_zarr('example-data.zarr', chunks={'time': 1000})

with beam.Pipeline() as p:
    p | xbeam.DatasetToChunks(on_disk) | beam.MapTuple(lambda k, v: print(k, type(v)))
Key(offsets={'lat': 0, 'lon': 0, 'time': 0}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 1000}, vars=None) <class 'xarray.core.dataset.Dataset'>
Key(offsets={'lat': 0, 'lon': 0, 'time': 2000}, vars=None) <class 'xarray.core.dataset.Dataset'>

Dask’s lazy evaluation system is much more general than Xarray’s lazy indexing, so as long as resulting dataset can be independently evaluated in each chunk using Dask can be a very convenient way to setup computation for Xarray-Beam.

Unfortunately, it doesn’t scale as well. In particular, the overhead of pickling large Dask graphs for passing to Beam workers can be prohibitive for large (multiple TB) datasets with millions of chunks. There are plans to eventually fix this in Dask, but in the meantime, prefer the pattern of using Dask arrays with single chunks (e.g., as created by make_template), with separate explicit specification of array chunks.