Core data model

Xarray-Beam tries to make it straightforward to write distributed pipelines with Xarray objects, but unlike libraries like Xarray with Dask or Dask/Spark DataFrames, it doesn’t hide the distributed magic inside high-level objects.

Xarray-Beam is a lower-level tool. You will be manipulating large datasets piece-by-piece yourself, and you as the developer will be responsible for maintaining Xarray-Beam’s internal invariants. This means that to successfully use Xarray-Beam, you will need to understand how how it represents distributed datasets.

This responsibility requires a bit more coding and understanding, but offers benefits in performance and flexibility. This brief tutorial will show you how.

We’ll start off with some standard imports:

import apache_beam as beam
import numpy as np
import xarray_beam as xbeam
import xarray

Keys in Xarray-Beam

Xarray-Beam is designed around the model that every stage in your Beam pipeline could be stored in a single xarray.Dataset object, but is instead represented by a distributed beam PCollection of smaller xarray.Dataset objects, distributed in two possible ways:

  • Distinct variables in a Dataset may be separated across multiple records.

  • Individual arrays can also be split into multiple chunks, similar to those used by dask.array.

To keep track of how individual records could be combined into a larger (virtual) dataset, Xarray-Beam defines a Key object. Key objects consist of:

  1. offsets: integer offests for chunks from the origin in an immutabledict

  2. vars: The subset of variables included in each chunk, either as a frozenset, or as None to indicate “all variables”.

Making a Key from scratch is simple:

key = xbeam.Key({'x': 0, 'y': 10}, vars=None)
key
Key(offsets={'x': 0, 'y': 10}, vars=None)

Or given an existing Key, you can easily modify it with replace() or with_offsets():

key.replace(vars={'foo', 'bar'})
Key(offsets={'x': 0, 'y': 10}, vars={'foo', 'bar'})
key.with_offsets(x=None, z=1)
Key(offsets={'y': 10, 'z': 1}, vars=None)

Key objects don’t do very much. They are just simple structs with two attributes, along with various special methods required to use them as dict keys or as keys in Beam pipelines. You can find a more examples of manipulating keys in the docstring of Key.

Creating PCollections

The standard inputs & outputs for Xarray-Beam are PCollections of (xbeam.Key, xarray.Dataset) pairs. Xarray-Beam provides a bunch of PCollections for typical tasks, but many pipelines will still involve some manual manipulation of Key and Dataset objects, e.g., with builtin Beam transforms like beam.Map.

To start off, let’s write a helper functions for creating our first collection from scratch:

def create_records():
    for offset in [0, 4]:
        key = xbeam.Key({'x': offset, 'y': 0})
        data = 2 * offset + np.arange(8).reshape(4, 2)
        chunk = xarray.Dataset({
            'foo': (('x', 'y'), data),
            'bar': (('x', 'y'), 100 + data),
        })
        yield key, chunk

Let’s take a look the entries, which are lazily constructed with the generator:

inputs = list(create_records())
inputs
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7
      bar      (x, y) int64 100 101 102 103 104 105 106 107),
 (Key(offsets={'x': 4, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 8 9 10 11 12 13 14 15
      bar      (x, y) int64 108 109 110 111 112 113 114 115)]

Note

There are multiple valid ways to represent a chunk of a larger dataset with a Key.

  • Offsets for unchunked dimensions are optional. Because all chunks have the same offset along the y axis, including y in offsets is not required as long as we don’t need to create multiple chunks along that dimension.

  • Indicating variables is optional, if all chunks have the same variables. We could have set vars={'foo', 'bar'} on each of these Key objects instead of vars=None. This would be an equally valid representation of the same records, since all of our datasets have the same variables.

We now have the inputs we need to use Xarray-Beam’s helper functions and PTransforms. For example, we can fully consolidate chunks & variables to see what single xarray.Dataset these values would correspond to:

xbeam.consolidate_fully(inputs)
(Key(offsets={'x': 0, 'y': 0}, vars={'foo', 'bar'}),
 <xarray.Dataset>
 Dimensions:  (x: 8, y: 2)
 Dimensions without coordinates: x, y
 Data variables:
     foo      (x, y) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
     bar      (x, y) int64 100 101 102 103 104 105 ... 110 111 112 113 114 115)

To execute with Beam, of course, we need to turn Python lists/generators into Beam PCollections, e.g., with beam.Create():

with beam.Pipeline() as p:
    p | beam.Create(create_records()) | beam.Map(print)
(Key(offsets={'x': 0, 'y': 0}, vars=None), <xarray.Dataset>
Dimensions:  (x: 4, y: 2)
Dimensions without coordinates: x, y
Data variables:
    foo      (x, y) int64 0 1 2 3 4 5 6 7
    bar      (x, y) int64 100 101 102 103 104 105 106 107)
(Key(offsets={'x': 4, 'y': 0}, vars=None), <xarray.Dataset>
Dimensions:  (x: 4, y: 2)
Dimensions without coordinates: x, y
Data variables:
    foo      (x, y) int64 8 9 10 11 12 13 14 15
    bar      (x, y) int64 108 109 110 111 112 113 114 115)

Writing pipelines

Transforms in Xarray-Beam typically act on (key, value) pairs of (xbeam.Key, xarray.Dataset). For example, we can dump our dataset on disk in the scalable Zarr format using ChunksToZarr:

inputs | xbeam.ChunksToZarr('my-data.zarr')
[None, None]

Xarray-Beam doesn’t try to provide transformations for everything. In particular, it omits most embarrassingly parallel operations that can be performed independently on each chunk of a larger dataset. You can write these yourself using beam.Map.

For example, consider elementwise arithmetic. We can write a lambda function that acts on each key-value pair updating the xarray.Dataset objects appropriately, and put it into an Xarray-Beam pipeline using beam.MapTuple:

inputs | beam.MapTuple(lambda k, v: (k, v + 1))
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 1 2 3 4 5 6 7 8
      bar      (x, y) int64 101 102 103 104 105 106 107 108),
 (Key(offsets={'x': 4, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 9 10 11 12 13 14 15 16
      bar      (x, y) int64 109 110 111 112 113 114 115 116)]

For operations that add or remove (unchunked) dimensions, you may need to update Key objects as well to maintain the Xarray-Beam invariants, e.g., if we want to remove the y dimension entirely:

inputs | beam.MapTuple(lambda k, v: (k.with_offsets(y=None), v.mean('y')))
[(Key(offsets={'x': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) float64 0.5 2.5 4.5 6.5
      bar      (x) float64 100.5 102.5 104.5 106.5),
 (Key(offsets={'x': 4}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4)
  Dimensions without coordinates: x
  Data variables:
      foo      (x) float64 8.5 10.5 12.5 14.5
      bar      (x) float64 108.5 110.5 112.5 114.5)]

Note

Missing transformations in Xarray-Beam is partially an intentional design decision to reduce scope, and partially just a reflection of what we’ve gotten around to implementing. If after reading through the rest of docs you notice missing transformations or are wondering how to compute something in Xarray-Beam, please open an issue to discuss.