Rechunking

Rechunking lets us re-distribute how datasets are split between variables and chunks across a Beam PCollection.

To get started we’ll recreate our dummy data from the data model tutorial:

import apache_beam as beam
import numpy as np
import xarray_beam as xbeam
import xarray

def create_records():
    for offset in [0, 4]:
        key = xbeam.Key({'x': offset, 'y': 0})
        data = 2 * offset + np.arange(8).reshape(4, 2)
        chunk = xarray.Dataset({
            'foo': (('x', 'y'), data),
            'bar': (('x', 'y'), 100 + data),
        })
        yield key, chunk
        
inputs = list(create_records())

Choosing chunks

Chunking can be essential for some operations. Some operations are very hard or impossible to perform with certain chunking schemes. For example, to make a plot all the data needs to come toether on a single machine. Other calculations such as calculating a median are possible to perform on distributed data, but require tricky algorithms and/or approximation.

More broadly, chunking can have critical performance implications, similar to those for Xarray and Dask. As a rule of thumb, chunk sizes of 10-100 MB work well. The optimal chunk size is a balance among a number of considerations, adapted here from Dask docs:

  1. Chunks should be small enough to fit comfortably into memory on a single machine. As an upper limit, chunks over roughly 2 GB in size will not fit into the protocol buffers Beam uses to pass data between workers.

  2. There should be enough chunks for Beam runners (like Cloud Dataflow) to elastically shard work over many workers.

  3. Chunks should be large enough to amortize the overhead of networking and the Python interpreter, which starts to become noticeable for arrays with fewer than 1 million elements.

The nbytes attribute on both NumPy arrays and xarray.Dataset objects is a good easy way to figure out how larger chunks are.

Adjusting variables

The simplest transformation is splitting (or consoldating) different variables in a Dataset with SplitVariables() and ConsolidateVariables(), e.g.,

inputs | xbeam.SplitVariables()
[(Key(offsets={'x': 0, 'y': 0}, vars={'foo'}),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7),
 (Key(offsets={'x': 0, 'y': 0}, vars={'bar'}),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      bar      (x, y) int64 100 101 102 103 104 105 106 107),
 (Key(offsets={'x': 4, 'y': 0}, vars={'foo'}),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 8 9 10 11 12 13 14 15),
 (Key(offsets={'x': 4, 'y': 0}, vars={'bar'}),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      bar      (x, y) int64 108 109 110 111 112 113 114 115)]

Tip

Instead of a separate transform for splitting variables, you can also set split_vars=True in DatasetToChunks.

Adjusting chunks

You can also adjust chunks in a dataset to distribute arrays of different sizes. Here you have two choices of API:

  1. The lower level SplitChunks and ConsolidateChunks. These transformations apply a single splitting (with indexing) or consolidation (with xarray.concat) function to array elements.

  2. The high level Rechunk, which uses a pipeline of multiple split/consolidate steps (as needed) to efficiently rechunk a dataset.

Low level rechunking

For minor adjustments (e.g., mostly along a single dimension), the more explicit SplitChunks() and ConsolidateChunks() are good options. They take a dict of desired chunk sizes as a parameter, which can also be -1 to indicate “no chunking” along a dimension:

inputs | xbeam.ConsolidateChunks({'x': -1})
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 8, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
      bar      (x, y) int64 100 101 102 103 104 105 ... 110 111 112 113 114 115)]

Note that because these transformations only split or consolidate, they cannot necessary fully rechunk a dataset in a single step if the new chunk sizes are not multiples of old chunks (with consolidate) or do not even divide the old chunks (with split), e.g.,

inputs | xbeam.SplitChunks({'x': 5})  # notice that the first two chunks are still separate!
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 4, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7
      bar      (x, y) int64 100 101 102 103 104 105 106 107),
 (Key(offsets={'x': 4, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 1, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 8 9
      bar      (x, y) int64 108 109),
 (Key(offsets={'x': 5, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 10 11 12 13 14 15
      bar      (x, y) int64 110 111 112 113 114 115)]

For such uneven cases, you’ll need to use split followed by consolidate:

inputs | xbeam.SplitChunks({'x': 5}) | xbeam.ConsolidateChunks({'x': 5})
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 5, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7 8 9
      bar      (x, y) int64 100 101 102 103 104 105 106 107 108 109),
 (Key(offsets={'x': 5, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 10 11 12 13 14 15
      bar      (x, y) int64 110 111 112 113 114 115)]

High level rechunking

Alternatively, the high-level Rechunk() method applies multiple split and consolidate steps based on the Rechunker algorithm:

inputs | xbeam.Rechunk(dim_sizes={'x': 6}, source_chunks={'x': 3}, target_chunks={'x': 5}, itemsize=8)
[(Key(offsets={'x': 0, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 5, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 0 1 2 3 4 5 6 7 8 9
      bar      (x, y) int64 100 101 102 103 104 105 106 107 108 109),
 (Key(offsets={'x': 5, 'y': 0}, vars=None),
  <xarray.Dataset>
  Dimensions:  (x: 3, y: 2)
  Dimensions without coordinates: x, y
  Data variables:
      foo      (x, y) int64 10 11 12 13 14 15
      bar      (x, y) int64 110 111 112 113 114 115)]

Rechunk requires specifying a few more parameters, but based on that information it can be much more efficient for more complex rechunking tasks, particular in cases where data needs to be distributed into a very different shape (e.g., distributing a matrix across rows vs. columns).

The naive “splitting” approach in such cases may divide datasets into extremely small tasks corresponding to individual array elements, which adds a huge amount of overhead.