Skip to content

add storage_transformers and get/set_partial_values #1096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
605620b
add storage_transformers and get/set_partial_values
jstriebel Jul 28, 2022
566e4b0
formatting
jstriebel Jul 28, 2022
5f85439
add docs and release notes
jstriebel Jul 28, 2022
3c38d57
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jul 28, 2022
dd7fedb
add test_core testcase
jstriebel Jul 29, 2022
e33b365
Update zarr/creation.py
jstriebel Jul 29, 2022
81ebf68
apply PR feedback
jstriebel Jul 29, 2022
ca28471
add comment that storage_transformers=None is the same as storage_tra…
jstriebel Jul 29, 2022
85f3309
use empty tuple as default for storage_transformers
jstriebel Aug 1, 2022
03de894
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 1, 2022
41eaafb
make mypy happy
jstriebel Aug 1, 2022
5d7be76
better coverage, minor fix, adding rmdir
jstriebel Aug 1, 2022
46229ad
add missing rmdir to test
jstriebel Aug 1, 2022
3a9f7cc
increase coverage
jstriebel Aug 2, 2022
efa4e07
improve test coverage
jstriebel Aug 3, 2022
b4668a8
fix TestArrayWithStorageTransformersV3
jstriebel Aug 3, 2022
e4a4853
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 5, 2022
e454046
Update zarr/creation.py
jstriebel Aug 8, 2022
a3c7f74
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 8, 2022
c041dd8
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 22, 2022
696d5ca
pick generic storage transformer changes from #1111
jstriebel Aug 22, 2022
c099440
increase coverage
jstriebel Aug 22, 2022
be98c01
fix order of storage transformers
jstriebel Aug 24, 2022
7c2767a
retrigger CI
jstriebel Aug 25, 2022
146c30a
Merge remote-tracking branch 'origin/main' into storage-transformers-…
jstriebel Dec 12, 2022
59cca8b
minor fixes
jstriebel Dec 12, 2022
c2dc0d6
make flake8 happy
jstriebel Dec 12, 2022
7402262
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 19, 2022
b9d8177
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 21, 2022
91f0c2c
apply PR feedback
jstriebel Dec 22, 2022
e68c97f
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 22, 2022
a7e4d89
Merge remote-tracking branch 'origin/main' into storage-transformers-…
joshmoore Jan 16, 2023
fcb9ba0
Merge pull request #3 from joshmoore/storage-transformers-and-partial…
jstriebel Jan 16, 2023
b6588e7
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
eba9006
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ Release notes
Unreleased
----------

Enhancements
~~~~~~~~~~~~

* **Improve Zarr V3 support, adding partial store read/write and storage transformers.**
Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html):
* storage transformers
* `get_partial_values` and `set_partial_values`
By :user:`Jonathan Striebel <jstriebel>`; :issue:`1096`.


Documentation
~~~~~~~~~~~~~

Expand Down
164 changes: 164 additions & 0 deletions zarr/_storage/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
import os
from collections import defaultdict
from collections.abc import MutableMapping
from copy import copy
from string import ascii_letters, digits
from typing import Any, List, Mapping, Optional, Union

Expand Down Expand Up @@ -254,6 +256,49 @@ def __setitem__(self, key, value):
def __getitem__(self, key):
"""Get a value."""

def get_partial_values(self, key_ranges):
"""Get multiple partial values.
key_ranges can be an iterable of key, range pairs,
where a range specifies two integers range_start and range_length
as a tuple, (range_start, range_length).
Length may be None to indicate to read until the end.
A key may occur multiple times with different ranges."""
results = [None] * len(key_ranges)
indexed_ranges_by_key = defaultdict(list)
for i, (key, range_) in enumerate(key_ranges):
indexed_ranges_by_key[key].append((i, range_))
for key, indexed_ranges in indexed_ranges_by_key.items():
value = self[key]
for i, (range_from, range_length) in indexed_ranges:
if range_length is None:
results[i] = value[range_from:]
else:
results[i] = value[range_from:range_from + range_length]
return results

def set_partial_values(self, key_start_values):
"""Set multiple partial values.
key_start_values can be an iterable of key, start and value triplets
as tuples, (key, start, value), where start defines the offset in bytes.
A key may occur multiple times with different starts and non-overlapping values.
Also, start may only be beyond the current value if other values fill the gap."""
unique_keys = set(next(zip(*key_start_values)))
values = {key: bytearray(self.get(key)) for key in unique_keys}
for key, start, value in key_start_values:
if values[key] is None:
assert start == 0
values[key] = value
else:
if start > len(values[key]):
raise ValueError(
f"Cannot set value at start {start}, "
+ f"since it is beyond the data at key {key}, "
+ f"having length {len(values[key])}."
)
values[key][start:start + len(value)] = value
for key, value in values.items():
self[key] = value

def clear(self):
"""Remove all items from store."""
self.erase_prefix("/")
Expand Down Expand Up @@ -303,6 +348,125 @@ def _ensure_store(store):
)


class StorageTransformer(MutableMapping, abc.ABC):
"""Base class for storage transformers. The methods simply pass on the data as-is
and should be overwritten by sub-classes."""

_store_version = 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight surprise to find this in store.py rather than under v3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that the base-classes go into store.py, similar to StoreV3, but also happy to move this into _storage/v3.py if that's a better fit.

_metadata_class = Metadata3

def __init__(self, _type) -> None:
if _type not in self.valid_types:
raise ValueError(
f"Storage transformer cannot be initialized with type {_type}, "
+ f"must be one of {list(self.valid_types)}."
)
self.type = _type
self._inner_store = None

def _copy_for_array(self, inner_store):
transformer_copy = copy(self)
transformer_copy._inner_store = inner_store
return transformer_copy

@abc.abstractproperty
def extension_uri(self):
pass

@abc.abstractproperty
def valid_types(self):
pass

def get_config(self):
"""Return a dictionary holding configuration parameters for this
storage transformer. All values must be compatible with JSON encoding."""
# Override in sub-class if need special encoding of config values.
# By default, assume all non-private members are configuration
# parameters except for type .
return {
k: v for k, v in self.__dict__.items()
if not k.startswith('_') and k != "type"
}

@classmethod
def from_config(cls, _type, config):
"""Instantiate storage transformer from a configuration object."""
# override in sub-class if need special decoding of config values

# by default, assume constructor accepts configuration parameters as
# keyword arguments without any special decoding
return cls(_type, **config)

def is_readable(self):
return self._inner_store.is_readable()

def is_writeable(self):
return self._inner_store.is_writeable()

def is_listable(self):
return self._inner_store.is_listable()

def is_erasable(self):
return self._inner_store.is_erasable()

def __enter__(self):
return self._inner_store.__enter__()

def __exit__(self, exc_type, exc_value, traceback):
return self._inner_store.__exit__(exc_type, exc_value, traceback)

def close(self) -> None:
return self._inner_store.close()

def rename(self, src_path: str, dst_path: str) -> None:
return self._inner_store.rename(src_path, dst_path)

def list_prefix(self, prefix):
return self._inner_store.list_prefix(prefix)

def erase(self, key):
return self._inner_store.erase(key)

def erase_prefix(self, prefix):
return self._inner_store.erase_prefix(prefix)

def list_dir(self, prefix):
return self._inner_store.list_dir(prefix)

def list(self):
return self._inner_store.list()

def __contains__(self, key):
return self._inner_store.__contains__(key)

def __setitem__(self, key, value):
return self._inner_store.__setitem__(key, value)

def __getitem__(self, key):
return self._inner_store.__getitem__(key)

def __delitem__(self, key):
return self._inner_store.__delitem__(key)

def __iter__(self):
return self._inner_store.__iter__()

def __len__(self):
return self._inner_store.__len__()

def get_partial_values(self, key_ranges):
return self._inner_store.get_partial_values(key_ranges)

def set_partial_values(self, key_start_values):
return self._inner_store.set_partial_values(key_start_values)

def clear(self):
return self._inner_store.clear()

def __eq__(self, other):
return self._inner_store.__eq__(other)


# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]

Expand Down
14 changes: 14 additions & 0 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,20 @@ def _load_metadata_nosync(self):
filters = [get_codec(config) for config in filters]
self._filters = filters

if self._version == 3:
storage_transformers = meta.get('storage_transformers', [])
transformed_store = self._store
for storage_transformer in storage_transformers:
transformed_store = storage_transformer._copy_for_array(transformed_store)
self._store = transformed_store
if self._chunk_store is not None:
transformed_chunk_store = self._chunk_store
for storage_transformer in storage_transformers:
transformed_chunk_store = (
storage_transformer._copy_for_array(transformed_chunk_store)
)
self._chunk_store = transformed_chunk_store

def _refresh_metadata(self):
if not self._cache_metadata:
self._load_metadata()
Expand Down
13 changes: 11 additions & 2 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
overwrite=False, path=None, chunk_store=None, filters=None,
cache_metadata=True, cache_attrs=True, read_only=False,
object_codec=None, dimension_separator=None, write_empty_chunks=True,
*, zarr_version=None, **kwargs):
*, zarr_version=None, storage_transformers=None, **kwargs):
"""Create an array.

Parameters
Expand Down Expand Up @@ -84,6 +84,15 @@ def create(shape, chunks=True, dtype=None, compressor='default',

.. versionadded:: 2.11

storage_transformers : sequence of StorageTransformers, optional
Setting storage transformers, changes the storage structure and behaviour
of data coming from the underlying store. The transformers are applied in the
order of the given sequence. Supplying an empty sequence is the same as omitting
the argument or setting it to None. May only be set when using zarr_version 3.
Supplying an empty seq

.. versionadded:: 2.13

zarr_version : {None, 2, 3}, optional
The zarr protocol version of the created array. If None, it will be
inferred from ``store`` or ``chunk_store`` if they are provided,
Expand Down Expand Up @@ -161,7 +170,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
fill_value=fill_value, order=order, overwrite=overwrite, path=path,
chunk_store=chunk_store, filters=filters, object_codec=object_codec,
dimension_separator=dimension_separator)
dimension_separator=dimension_separator, storage_transformers=storage_transformers)

# instantiate array
z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer,
Expand Down
50 changes: 49 additions & 1 deletion zarr/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from zarr.errors import MetadataError
from zarr.util import json_dumps, json_loads

from typing import cast, Union, Any, List, Mapping as MappingType, Optional
from typing import cast, Union, Any, List, Mapping as MappingType, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from zarr._storage.store import StorageTransformer


ZARR_FORMAT = 2
ZARR_FORMAT_v3 = 3
Expand Down Expand Up @@ -459,6 +463,36 @@ def _decode_codec_metadata(cls, meta: Optional[Mapping]) -> Optional[Codec]:

return codec

@classmethod
def _encode_storage_transformer_metadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the splitting of storage.py into separate files, this makes me wonder if meta and some of the other core classes shouldn't also have v3 variants.

cls,
storage_transformer: "StorageTransformer"
) -> Optional[Mapping]:
return {
"extension": storage_transformer.extension_uri,
"type": storage_transformer.type,
"configuration": storage_transformer.get_config(),
}

@classmethod
def _decode_storage_transformer_metadata(cls, meta: Mapping) -> "StorageTransformer":
from zarr.tests.test_storage_v3 import DummyStorageTransfomer

# This might be changed to a proper registry in the future
KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer]

conf = meta.get('configuration', {})
extension_uri = meta['extension']
transformer_type = meta['type']

for StorageTransformerCls in KNOWN_STORAGE_TRANSFORMERS:
if StorageTransformerCls.extension_uri == extension_uri:
break
else:
raise NotImplementedError

return StorageTransformerCls.from_config(transformer_type, conf)

@classmethod
def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, Any]:
meta = cls.parse_metadata(s)
Expand All @@ -476,6 +510,11 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A
# TODO: remove dimension_separator?

compressor = cls._decode_codec_metadata(meta.get("compressor", None))
storage_transformers = meta.get("storage_transformers", None)
if storage_transformers:
storage_transformers = [
cls._decode_storage_transformer_metadata(i) for i in storage_transformers
]
extensions = meta.get("extensions", [])
meta = dict(
shape=tuple(meta["shape"]),
Expand All @@ -493,6 +532,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A
# compressor field should be absent when there is no compression
if compressor:
meta['compressor'] = compressor
if storage_transformers:
meta['storage_transformers'] = storage_transformers

except Exception as e:
raise MetadataError("error decoding metadata: %s" % e)
Expand All @@ -514,6 +555,11 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes:
object_codec = None

compressor = cls._encode_codec_metadata(meta.get("compressor", None))
storage_transformers = meta.get("storage_transformers", None)
if storage_transformers:
storage_transformers = [
cls._encode_storage_transformer_metadata(i) for i in storage_transformers
]
extensions = meta.get("extensions", [])
meta = dict(
shape=meta["shape"] + sdshape,
Expand All @@ -532,6 +578,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes:
meta["compressor"] = compressor
if dimension_separator:
meta["dimension_separator"] = dimension_separator
if storage_transformers:
meta["storage_transformers"] = storage_transformers
return json_dumps(meta)


Expand Down
9 changes: 7 additions & 2 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def init_array(
filters=None,
object_codec=None,
dimension_separator=None,
storage_transformers=None,
):
"""Initialize an array store with the given configuration. Note that this is a low-level
function and there should be no need to call this directly from user code.
Expand Down Expand Up @@ -421,7 +422,8 @@ def init_array(
order=order, overwrite=overwrite, path=path,
chunk_store=chunk_store, filters=filters,
object_codec=object_codec,
dimension_separator=dimension_separator)
dimension_separator=dimension_separator,
storage_transformers=storage_transformers)


def _init_array_metadata(
Expand All @@ -438,6 +440,7 @@ def _init_array_metadata(
filters=None,
object_codec=None,
dimension_separator=None,
storage_transformers=None,
):

store_version = getattr(store, '_store_version', 2)
Expand Down Expand Up @@ -559,6 +562,7 @@ def _init_array_metadata(
if store_version < 3:
meta.update(dict(chunks=chunks, dtype=dtype, order=order,
filters=filters_config))
assert not storage_transformers
else:
if dimension_separator is None:
dimension_separator = "/"
Expand All @@ -572,7 +576,8 @@ def _init_array_metadata(
separator=dimension_separator),
chunk_memory_layout=order,
data_type=dtype,
attributes=attributes)
attributes=attributes,
storage_transformers=storage_transformers)
)

key = _prefix_to_array_key(store, _path_to_prefix(path))
Expand Down
Loading