xarray_beam.ChunksToZarr

class xarray_beam.ChunksToZarr(store, template=None, zarr_chunks=None, *, zarr_shards=None, zarr_format=None, num_threads=None, needs_setup=True, encoding=None, stage_locally=None)

Write keyed chunks to a Zarr store in parallel.

Parameters:
  • store (WritableStore)

  • template (xarray.Dataset | beam.pvalue.AsSingleton | None)

  • zarr_chunks (Mapping[str, int] | None)

  • zarr_shards (Mapping[str, int] | None)

  • zarr_format (int | None)

  • num_threads (int | None)

  • needs_setup (bool)

  • encoding (Mapping[str, Any] | None)

  • stage_locally (bool | None)

__init__(store, template=None, zarr_chunks=None, *, zarr_shards=None, zarr_format=None, num_threads=None, needs_setup=True, encoding=None, stage_locally=None)[source]

Initialize ChunksToZarr.

Note on chunking and sharding:

The expected chunking in PCollections fed into ChunksToZarr depends on whether or not use you Zarr v3’s sharding feature, to group multiple “chunks” into “shards” that are stored in individual files. Sharding is optional. The default behavior of no sharding (one chunk per shard) is equivalent to setting chunks and shards to the same value.

Zarr supports partial _reads_ of chunks from a shard, but shards must be written in their entirety. This means that if you use sharding (by setting zarr_shards), PCollections to write with ChunksToZarr should be chunked like zarr_shards.

Parameters:
  • store (str | Store | StorePath | FSMap | Path | dict[str, Buffer] | PathLike[str]) – a string corresponding to a Zarr path or an existing Zarr store.

  • template (Dataset | AsSingleton | None) –

    an argument providing a lazy xarray.Dataset already chunked using Dask (e.g., as created by xarray_beam.make_template) that matches the structure of the virtual combined dataset corresponding to the chunks fed into this PTransform. One or more variables are expected to be “chunked” with Dask, and will only have their metadata written to Zarr without array values. Two types of inputs are supported:

    1. If template is an xarray.Dataset, the Zarr store is setup eagerly.

    2. If template is a beam.pvalue.AsSingleton object representing the result of a prior step in a Beam pipeline, the Zarr store is setup as part of the pipeline.

    A template of None is also supported only for backwards compatibility, in which case Xarray-Beam will attempt to discover the structure of the desired Zarr store automatically by inspecting the inputs into. THIS OPTION IS NOT RECOMMENDED. Due to a race condition (https://github.com/google/xarray-beam/issues/85), it can result in writing corrupted data Zarr stores, particularly when they contain many variables. It can also be quite slow for large datasets.

  • zarr_chunks (Mapping[str, int] | None) – chunking scheme to use for Zarr. If set, overrides the chunking scheme on already chunked arrays in template. Chunks of -1 use the full dimension size from the dataset, like dask.array.

  • zarr_shards (Mapping[str, int] | None) – optional sharding scheme to use for Zarr. Only valid if using zarr_format=3. Shards of -1 use the full dimension size from the dataset, like dask.array. Unspecified shard sizes default to chunk sizes.

  • zarr_format (int | None) – The desired zarr format to target (currently 2 or 3). The default of None will attempt to determine the zarr version from store when possible, otherwise defaulting to the default version used by the zarr-python library installed.

  • num_threads (int | None) – the number of Dataset chunks to write in parallel per worker. More threads can increase throughput, but also increases memory usage and makes it harder for Beam runners to shard work. Note that each variable in a Dataset is already written in parallel, so this is most useful for Datasets with a small number of variables.

  • needs_setup (bool) – if False, then the Zarr store is already setup and does not need to be set up as part of this PTransform.

  • encoding (Mapping[str, Any] | None) – Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., {"my_variable": {"dtype": "int16", "scale_factor": 0.1,}, ...}

  • stage_locally (bool | None) – If True, write Zarr metadata to a local temporary directory before copying to store in parallel. This can significantly speed up setup on high-latency filesystems. By default, uses local staging if possible, which is true as long as store is provided as as string or path.

Methods

__init__(store[, template, zarr_chunks, ...])

Initialize ChunksToZarr.

annotations()

default_label()

default_type_hints()

display_data()

Returns the display data associated to a pipeline component.

expand(pcoll)

from_runner_api(proto, context)

get_resource_hints()

get_type_hints()

Gets and/or initializes type hints for this object.

get_windowing(inputs)

Returns the window function to be associated with transform's output.

infer_output_type(unused_input_type)

register_urn(urn, parameter_type[, constructor])

runner_api_requires_keyed_input()

to_runner_api(context[, has_parts])

to_runner_api_parameter(unused_context)

to_runner_api_pickled(context)

type_check_inputs(pvalueish)

type_check_inputs_or_outputs(pvalueish, ...)

type_check_outputs(pvalueish)

with_input_types(input_type_hint)

Annotates the input type of a PTransform with a type-hint.

with_output_types(type_hint)

Annotates the output type of a PTransform with a type-hint.

with_resource_hints(**kwargs)

Adds resource hints to the PTransform.

Attributes

label

pipeline

side_inputs