-
-
Notifications
You must be signed in to change notification settings - Fork 330
add storage_transformers and get/set_partial_values #1096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 31 commits
605620b
566e4b0
5f85439
3c38d57
dd7fedb
e33b365
81ebf68
ca28471
85f3309
03de894
41eaafb
5d7be76
46229ad
3a9f7cc
efa4e07
b4668a8
e4a4853
e454046
a3c7f74
c041dd8
696d5ca
c099440
be98c01
7c2767a
146c30a
59cca8b
c2dc0d6
7402262
b9d8177
91f0c2c
e68c97f
a7e4d89
fcb9ba0
b6588e7
eba9006
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,8 +1,10 @@ | ||||||||||
import abc | ||||||||||
import os | ||||||||||
from collections import defaultdict | ||||||||||
from collections.abc import MutableMapping | ||||||||||
from copy import copy | ||||||||||
from string import ascii_letters, digits | ||||||||||
from typing import Any, List, Mapping, Optional, Union | ||||||||||
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union | ||||||||||
|
||||||||||
from zarr.meta import Metadata2, Metadata3 | ||||||||||
from zarr.util import normalize_storage_path | ||||||||||
|
@@ -254,6 +256,82 @@ def __setitem__(self, key, value): | |||||||||
def __getitem__(self, key): | ||||||||||
"""Get a value.""" | ||||||||||
|
||||||||||
@abc.abstractmethod | ||||||||||
def rmdir(self, path=None): | ||||||||||
"""Remove a data path and all its subkeys and related metadata. | ||||||||||
Expects a path without the data or meta root prefix.""" | ||||||||||
|
||||||||||
@property | ||||||||||
def supports_efficient_get_partial_values(self): | ||||||||||
return False | ||||||||||
|
||||||||||
def get_partial_values( | ||||||||||
self, | ||||||||||
key_ranges: Sequence[Tuple[str, Tuple[int, Optional[int]]]] | ||||||||||
) -> List[Union[bytes, memoryview, bytearray]]: | ||||||||||
"""Get multiple partial values. | ||||||||||
key_ranges can be an iterable of key, range pairs, | ||||||||||
where a range specifies two integers range_start and range_length | ||||||||||
as a tuple, (range_start, range_length). | ||||||||||
range_length may be None to indicate to read until the end. | ||||||||||
range_start may be negative to start reading range_start bytes | ||||||||||
from the end of the file. | ||||||||||
A key may occur multiple times with different ranges. | ||||||||||
Inserts None for missing keys into the returned list.""" | ||||||||||
results: List[Union[bytes, memoryview, bytearray]] = ( | ||||||||||
[None] * len(key_ranges) # type: ignore[list-item] | ||||||||||
) | ||||||||||
indexed_ranges_by_key: Dict[str, List[Tuple[int, Tuple[int, Optional[int]]]]] = ( | ||||||||||
defaultdict(list) | ||||||||||
) | ||||||||||
for i, (key, range_) in enumerate(key_ranges): | ||||||||||
indexed_ranges_by_key[key].append((i, range_)) | ||||||||||
for key, indexed_ranges in indexed_ranges_by_key.items(): | ||||||||||
try: | ||||||||||
value = self[key] | ||||||||||
except KeyError: # pragma: no cover | ||||||||||
continue | ||||||||||
for i, (range_from, range_length) in indexed_ranges: | ||||||||||
if range_length is None: | ||||||||||
results[i] = value[range_from:] | ||||||||||
else: | ||||||||||
results[i] = value[range_from:range_from + range_length] | ||||||||||
return results | ||||||||||
|
||||||||||
def supports_efficient_set_partial_values(self): | ||||||||||
return False | ||||||||||
jstriebel marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
def set_partial_values(self, key_start_values): | ||||||||||
"""Set multiple partial values. | ||||||||||
key_start_values can be an iterable of key, start and value triplets | ||||||||||
as tuples, (key, start, value), where start defines the offset in bytes. | ||||||||||
A key may occur multiple times with different starts and non-overlapping values. | ||||||||||
Also, start may only be beyond the current value if other values fill the gap. | ||||||||||
start may be negative to start writing start bytes from the current | ||||||||||
end of the file, ending the file with the new value.""" | ||||||||||
unique_keys = set(next(zip(*key_start_values))) | ||||||||||
values = {} | ||||||||||
for key in unique_keys: | ||||||||||
old_value = self.get(key) | ||||||||||
values[key] = None if old_value is None else bytearray(old_value) | ||||||||||
for key, start, value in key_start_values: | ||||||||||
if values[key] is None: | ||||||||||
assert start == 0 | ||||||||||
values[key] = value | ||||||||||
else: | ||||||||||
if start > len(values[key]): # pragma: no cover | ||||||||||
raise ValueError( | ||||||||||
f"Cannot set value at start {start}, " | ||||||||||
+ f"since it is beyond the data at key {key}, " | ||||||||||
+ f"having length {len(values[key])}." | ||||||||||
) | ||||||||||
if start < 0: | ||||||||||
values[key][start:] = value | ||||||||||
else: | ||||||||||
values[key][start:start + len(value)] = value | ||||||||||
for key, value in values.items(): | ||||||||||
self[key] = value | ||||||||||
|
||||||||||
def clear(self): | ||||||||||
"""Remove all items from store.""" | ||||||||||
self.erase_prefix("/") | ||||||||||
|
@@ -303,6 +381,151 @@ def _ensure_store(store): | |||||||||
) | ||||||||||
|
||||||||||
|
||||||||||
class StorageTransformer(MutableMapping, abc.ABC): | ||||||||||
"""Base class for storage transformers. The methods simply pass on the data as-is | ||||||||||
and should be overwritten by sub-classes.""" | ||||||||||
|
||||||||||
_store_version = 3 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slight surprise to find this in store.py rather than under v3. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assumed that the base-classes go into |
||||||||||
_metadata_class = Metadata3 | ||||||||||
|
||||||||||
def __init__(self, _type) -> None: | ||||||||||
if _type not in self.valid_types: # pragma: no cover | ||||||||||
raise ValueError( | ||||||||||
f"Storage transformer cannot be initialized with type {_type}, " | ||||||||||
+ f"must be one of {list(self.valid_types)}." | ||||||||||
) | ||||||||||
self.type = _type | ||||||||||
self._inner_store = None | ||||||||||
|
||||||||||
def _copy_for_array(self, array, inner_store): | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the purpose of the unused There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, I've seen a need for this when implementing the Sharding storage transformer in #1111: zarr-python/zarr/_storage/v3_storage_transformers.py Lines 99 to 102 in 6e2790c
Related to this: I'm not super happy about the double-initialization using both |
||||||||||
transformer_copy = copy(self) | ||||||||||
transformer_copy._inner_store = inner_store | ||||||||||
return transformer_copy | ||||||||||
|
||||||||||
@abc.abstractproperty | ||||||||||
def extension_uri(self): | ||||||||||
pass # pragma: no cover | ||||||||||
|
||||||||||
@abc.abstractproperty | ||||||||||
def valid_types(self): | ||||||||||
pass # pragma: no cover | ||||||||||
|
||||||||||
def get_config(self): | ||||||||||
"""Return a dictionary holding configuration parameters for this | ||||||||||
storage transformer. All values must be compatible with JSON encoding.""" | ||||||||||
# Override in sub-class if need special encoding of config values. | ||||||||||
# By default, assume all non-private members are configuration | ||||||||||
# parameters except for type . | ||||||||||
return { | ||||||||||
k: v for k, v in self.__dict__.items() | ||||||||||
if not k.startswith('_') and k != "type" | ||||||||||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
@classmethod | ||||||||||
def from_config(cls, _type, config): | ||||||||||
"""Instantiate storage transformer from a configuration object.""" | ||||||||||
# override in sub-class if need special decoding of config values | ||||||||||
|
||||||||||
# by default, assume constructor accepts configuration parameters as | ||||||||||
# keyword arguments without any special decoding | ||||||||||
return cls(_type, **config) | ||||||||||
|
||||||||||
@property | ||||||||||
def inner_store(self) -> Union["StorageTransformer", StoreV3]: | ||||||||||
assert self._inner_store is not None, ( | ||||||||||
"inner_store is not initialized, first get a copy via _copy_for_array." | ||||||||||
) | ||||||||||
return self._inner_store | ||||||||||
|
||||||||||
# The following implementations are usually fine to keep as-is: | ||||||||||
|
||||||||||
def __eq__(self, other): | ||||||||||
return ( | ||||||||||
type(self) == type(other) and | ||||||||||
self._inner_store == other._inner_store and | ||||||||||
self.get_config() == other.get_config() | ||||||||||
) | ||||||||||
|
||||||||||
def erase(self, key): | ||||||||||
self.__delitem__(key) | ||||||||||
|
||||||||||
def list(self): | ||||||||||
return list(self.keys()) | ||||||||||
|
||||||||||
def list_dir(self, prefix): | ||||||||||
return StoreV3.list_dir(self, prefix) | ||||||||||
|
||||||||||
def is_readable(self): | ||||||||||
return self.inner_store.is_readable() | ||||||||||
|
||||||||||
def is_writeable(self): | ||||||||||
return self.inner_store.is_writeable() | ||||||||||
|
||||||||||
def is_listable(self): | ||||||||||
return self.inner_store.is_listable() | ||||||||||
|
||||||||||
def is_erasable(self): | ||||||||||
return self.inner_store.is_erasable() | ||||||||||
|
||||||||||
def clear(self): | ||||||||||
return self.inner_store.clear() | ||||||||||
|
||||||||||
def __enter__(self): | ||||||||||
return self.inner_store.__enter__() | ||||||||||
|
||||||||||
def __exit__(self, exc_type, exc_value, traceback): | ||||||||||
return self.inner_store.__exit__(exc_type, exc_value, traceback) | ||||||||||
|
||||||||||
def close(self) -> None: | ||||||||||
return self.inner_store.close() | ||||||||||
|
||||||||||
# The following implementations might need to be re-implemented | ||||||||||
# by subclasses implementing storage transformers: | ||||||||||
|
||||||||||
def rename(self, src_path: str, dst_path: str) -> None: | ||||||||||
return self.inner_store.rename(src_path, dst_path) | ||||||||||
|
||||||||||
def list_prefix(self, prefix): | ||||||||||
return self.inner_store.list_prefix(prefix) | ||||||||||
|
||||||||||
def erase_prefix(self, prefix): | ||||||||||
return self.inner_store.erase_prefix(prefix) | ||||||||||
|
||||||||||
def rmdir(self, path=None): | ||||||||||
return self.inner_store.rmdir(path) | ||||||||||
|
||||||||||
def __contains__(self, key): | ||||||||||
return self.inner_store.__contains__(key) | ||||||||||
|
||||||||||
def __setitem__(self, key, value): | ||||||||||
return self.inner_store.__setitem__(key, value) | ||||||||||
|
||||||||||
def __getitem__(self, key): | ||||||||||
return self.inner_store.__getitem__(key) | ||||||||||
|
||||||||||
def __delitem__(self, key): | ||||||||||
return self.inner_store.__delitem__(key) | ||||||||||
|
||||||||||
def __iter__(self): | ||||||||||
return self.inner_store.__iter__() | ||||||||||
|
||||||||||
def __len__(self): | ||||||||||
return self.inner_store.__len__() | ||||||||||
|
||||||||||
@property | ||||||||||
def supports_efficient_get_partial_values(self): | ||||||||||
return self.inner_store.supports_efficient_get_partial_values | ||||||||||
|
||||||||||
def get_partial_values(self, key_ranges): | ||||||||||
return self.inner_store.get_partial_values(key_ranges) | ||||||||||
|
||||||||||
def supports_efficient_set_partial_values(self): | ||||||||||
return self.inner_store.supports_efficient_set_partial_values() | ||||||||||
|
||||||||||
def set_partial_values(self, key_start_values): | ||||||||||
return self.inner_store.set_partial_values(key_start_values) | ||||||||||
|
||||||||||
|
||||||||||
# allow MutableMapping for backwards compatibility | ||||||||||
StoreLike = Union[BaseStore, MutableMapping] | ||||||||||
|
||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -189,6 +189,7 @@ def __init__( | |
|
||
self._store = store | ||
self._chunk_store = chunk_store | ||
self._transformed_chunk_store = None | ||
self._path = normalize_storage_path(path) | ||
if self._path: | ||
self._key_prefix = self._path + '/' | ||
|
@@ -292,6 +293,16 @@ def _load_metadata_nosync(self): | |
filters = [get_codec(config) for config in filters] | ||
self._filters = filters | ||
|
||
if self._version == 3: | ||
storage_transformers = meta.get('storage_transformers', []) | ||
if storage_transformers: | ||
transformed_store = self._chunk_store or self._store | ||
for storage_transformer in storage_transformers[::-1]: | ||
transformed_store = storage_transformer._copy_for_array( | ||
self, transformed_store | ||
) | ||
self._transformed_chunk_store = transformed_store | ||
|
||
def _refresh_metadata(self): | ||
if not self._cache_metadata: | ||
self._load_metadata() | ||
|
@@ -371,10 +382,12 @@ def read_only(self, value): | |
@property | ||
def chunk_store(self): | ||
"""A MutableMapping providing the underlying storage for array chunks.""" | ||
if self._chunk_store is None: | ||
return self._store | ||
else: | ||
if self._transformed_chunk_store is not None: | ||
return self._transformed_chunk_store | ||
elif self._chunk_store is not None: | ||
return self._chunk_store | ||
else: | ||
return self._store | ||
|
||
@property | ||
def shape(self): | ||
|
@@ -1800,7 +1813,7 @@ def _set_selection(self, indexer, value, fields=None): | |
check_array_shape('value', value, sel_shape) | ||
|
||
# iterate over chunks in range | ||
if not hasattr(self.store, "setitems") or self._synchronizer is not None \ | ||
if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have to wonder if this wouldn't ever need to be the transformed_chunk_store. In general, this chain of replacement stores feels slightly like an anti-pattern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'm also not particularly happy with this pattern, but also couldn't come up with a better solution for now. Happy about any ideas 👍 |
||
or any(map(lambda x: x == 0, self.shape)): | ||
# iterative approach | ||
for chunk_coords, chunk_selection, out_selection in indexer: | ||
|
@@ -2229,7 +2242,10 @@ def _encode_chunk(self, chunk): | |
cdata = chunk | ||
|
||
# ensure in-memory data is immutable and easy to compare | ||
if isinstance(self.chunk_store, KVStore): | ||
if ( | ||
isinstance(self.chunk_store, KVStore) | ||
or isinstance(self._chunk_store, KVStore) | ||
): | ||
cdata = ensure_bytes(cdata) | ||
|
||
return cdata | ||
|
Uh oh!
There was an error while loading. Please reload this page.