-
-
Notifications
You must be signed in to change notification settings - Fork 330
add storage_transformers and get/set_partial_values #1096
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
605620b
566e4b0
5f85439
3c38d57
dd7fedb
e33b365
81ebf68
ca28471
85f3309
03de894
41eaafb
5d7be76
46229ad
3a9f7cc
efa4e07
b4668a8
e4a4853
e454046
a3c7f74
c041dd8
696d5ca
c099440
be98c01
7c2767a
146c30a
59cca8b
c2dc0d6
7402262
b9d8177
91f0c2c
e68c97f
a7e4d89
fcb9ba0
b6588e7
eba9006
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import abc | ||
import os | ||
from collections import defaultdict | ||
from collections.abc import MutableMapping | ||
from copy import copy | ||
from string import ascii_letters, digits | ||
from typing import Any, List, Mapping, Optional, Union | ||
|
||
|
@@ -254,6 +256,49 @@ def __setitem__(self, key, value): | |
def __getitem__(self, key): | ||
"""Get a value.""" | ||
|
||
def get_partial_values(self, key_ranges): | ||
"""Get multiple partial values. | ||
key_ranges can be an iterable of key, range pairs, | ||
where a range specifies two integers range_start and range_length | ||
as a tuple, (range_start, range_length). | ||
Length may be None to indicate to read until the end. | ||
A key may occur multiple times with different ranges.""" | ||
results = [None] * len(key_ranges) | ||
indexed_ranges_by_key = defaultdict(list) | ||
jstriebel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i, (key, range_) in enumerate(key_ranges): | ||
indexed_ranges_by_key[key].append((i, range_)) | ||
for key, indexed_ranges in indexed_ranges_by_key.items(): | ||
value = self[key] | ||
for i, (range_from, range_length) in indexed_ranges: | ||
if range_length is None: | ||
results[i] = value[range_from:] | ||
else: | ||
results[i] = value[range_from:range_from + range_length] | ||
return results | ||
|
||
def set_partial_values(self, key_start_values): | ||
"""Set multiple partial values. | ||
key_start_values can be an iterable of key, start and value triplets | ||
as tuples, (key, start, value), where start defines the offset in bytes. | ||
A key may occur multiple times with different starts and non-overlapping values. | ||
Also, start may only be beyond the current value if other values fill the gap.""" | ||
unique_keys = set(next(zip(*key_start_values))) | ||
values = {key: bytearray(self.get(key)) for key in unique_keys} | ||
for key, start, value in key_start_values: | ||
if values[key] is None: | ||
assert start == 0 | ||
values[key] = value | ||
else: | ||
if start > len(values[key]): | ||
raise ValueError( | ||
f"Cannot set value at start {start}, " | ||
+ f"since it is beyond the data at key {key}, " | ||
+ f"having length {len(values[key])}." | ||
) | ||
values[key][start:start + len(value)] = value | ||
for key, value in values.items(): | ||
self[key] = value | ||
|
||
def clear(self): | ||
"""Remove all items from store.""" | ||
self.erase_prefix("/") | ||
|
@@ -303,6 +348,125 @@ def _ensure_store(store): | |
) | ||
|
||
|
||
class StorageTransformer(MutableMapping, abc.ABC): | ||
"""Base class for storage transformers. The methods simply pass on the data as-is | ||
and should be overwritten by sub-classes.""" | ||
|
||
_store_version = 3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slight surprise to find this in store.py rather than under v3. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assumed that the base-classes go into |
||
_metadata_class = Metadata3 | ||
|
||
def __init__(self, _type) -> None: | ||
if _type not in self.valid_types: | ||
raise ValueError( | ||
f"Storage transformer cannot be initialized with type {_type}, " | ||
+ f"must be one of {list(self.valid_types)}." | ||
) | ||
self.type = _type | ||
self._inner_store = None | ||
|
||
def _copy_for_array(self, inner_store): | ||
transformer_copy = copy(self) | ||
transformer_copy._inner_store = inner_store | ||
return transformer_copy | ||
|
||
@abc.abstractproperty | ||
def extension_uri(self): | ||
pass | ||
|
||
@abc.abstractproperty | ||
def valid_types(self): | ||
pass | ||
|
||
def get_config(self): | ||
"""Return a dictionary holding configuration parameters for this | ||
storage transformer. All values must be compatible with JSON encoding.""" | ||
# Override in sub-class if need special encoding of config values. | ||
# By default, assume all non-private members are configuration | ||
# parameters except for type . | ||
return { | ||
k: v for k, v in self.__dict__.items() | ||
if not k.startswith('_') and k != "type" | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@classmethod | ||
def from_config(cls, _type, config): | ||
"""Instantiate storage transformer from a configuration object.""" | ||
# override in sub-class if need special decoding of config values | ||
|
||
# by default, assume constructor accepts configuration parameters as | ||
# keyword arguments without any special decoding | ||
return cls(_type, **config) | ||
|
||
def is_readable(self): | ||
return self._inner_store.is_readable() | ||
|
||
def is_writeable(self): | ||
return self._inner_store.is_writeable() | ||
|
||
def is_listable(self): | ||
return self._inner_store.is_listable() | ||
|
||
def is_erasable(self): | ||
return self._inner_store.is_erasable() | ||
|
||
def __enter__(self): | ||
return self._inner_store.__enter__() | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
return self._inner_store.__exit__(exc_type, exc_value, traceback) | ||
|
||
def close(self) -> None: | ||
return self._inner_store.close() | ||
|
||
def rename(self, src_path: str, dst_path: str) -> None: | ||
return self._inner_store.rename(src_path, dst_path) | ||
|
||
def list_prefix(self, prefix): | ||
return self._inner_store.list_prefix(prefix) | ||
|
||
def erase(self, key): | ||
return self._inner_store.erase(key) | ||
|
||
def erase_prefix(self, prefix): | ||
return self._inner_store.erase_prefix(prefix) | ||
|
||
def list_dir(self, prefix): | ||
return self._inner_store.list_dir(prefix) | ||
|
||
def list(self): | ||
return self._inner_store.list() | ||
|
||
def __contains__(self, key): | ||
return self._inner_store.__contains__(key) | ||
|
||
def __setitem__(self, key, value): | ||
return self._inner_store.__setitem__(key, value) | ||
|
||
def __getitem__(self, key): | ||
return self._inner_store.__getitem__(key) | ||
|
||
def __delitem__(self, key): | ||
return self._inner_store.__delitem__(key) | ||
|
||
def __iter__(self): | ||
return self._inner_store.__iter__() | ||
|
||
def __len__(self): | ||
return self._inner_store.__len__() | ||
|
||
def get_partial_values(self, key_ranges): | ||
return self._inner_store.get_partial_values(key_ranges) | ||
|
||
def set_partial_values(self, key_start_values): | ||
return self._inner_store.set_partial_values(key_start_values) | ||
|
||
def clear(self): | ||
return self._inner_store.clear() | ||
|
||
def __eq__(self, other): | ||
return self._inner_store.__eq__(other) | ||
|
||
|
||
# allow MutableMapping for backwards compatibility | ||
StoreLike = Union[BaseStore, MutableMapping] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,11 @@ | |
from zarr.errors import MetadataError | ||
from zarr.util import json_dumps, json_loads | ||
|
||
from typing import cast, Union, Any, List, Mapping as MappingType, Optional | ||
from typing import cast, Union, Any, List, Mapping as MappingType, Optional, TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from zarr._storage.store import StorageTransformer | ||
|
||
|
||
ZARR_FORMAT = 2 | ||
ZARR_FORMAT_v3 = 3 | ||
|
@@ -459,6 +463,36 @@ def _decode_codec_metadata(cls, meta: Optional[Mapping]) -> Optional[Codec]: | |
|
||
return codec | ||
|
||
@classmethod | ||
def _encode_storage_transformer_metadata( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After the splitting of |
||
cls, | ||
storage_transformer: "StorageTransformer" | ||
) -> Optional[Mapping]: | ||
return { | ||
"extension": storage_transformer.extension_uri, | ||
"type": storage_transformer.type, | ||
"configuration": storage_transformer.get_config(), | ||
} | ||
|
||
@classmethod | ||
def _decode_storage_transformer_metadata(cls, meta: Mapping) -> "StorageTransformer": | ||
from zarr.tests.test_storage_v3 import DummyStorageTransfomer | ||
grlee77 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# This might be changed to a proper registry in the future | ||
KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer] | ||
|
||
conf = meta.get('configuration', {}) | ||
extension_uri = meta['extension'] | ||
transformer_type = meta['type'] | ||
|
||
for StorageTransformerCls in KNOWN_STORAGE_TRANSFORMERS: | ||
if StorageTransformerCls.extension_uri == extension_uri: | ||
break | ||
else: | ||
raise NotImplementedError | ||
|
||
return StorageTransformerCls.from_config(transformer_type, conf) | ||
|
||
@classmethod | ||
def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, Any]: | ||
meta = cls.parse_metadata(s) | ||
|
@@ -476,6 +510,11 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A | |
# TODO: remove dimension_separator? | ||
|
||
compressor = cls._decode_codec_metadata(meta.get("compressor", None)) | ||
storage_transformers = meta.get("storage_transformers", None) | ||
if storage_transformers: | ||
storage_transformers = [ | ||
cls._decode_storage_transformer_metadata(i) for i in storage_transformers | ||
] | ||
extensions = meta.get("extensions", []) | ||
meta = dict( | ||
shape=tuple(meta["shape"]), | ||
|
@@ -493,6 +532,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A | |
# compressor field should be absent when there is no compression | ||
if compressor: | ||
meta['compressor'] = compressor | ||
if storage_transformers: | ||
meta['storage_transformers'] = storage_transformers | ||
|
||
except Exception as e: | ||
raise MetadataError("error decoding metadata: %s" % e) | ||
|
@@ -514,6 +555,11 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: | |
object_codec = None | ||
|
||
compressor = cls._encode_codec_metadata(meta.get("compressor", None)) | ||
storage_transformers = meta.get("storage_transformers", None) | ||
if storage_transformers: | ||
storage_transformers = [ | ||
cls._encode_storage_transformer_metadata(i) for i in storage_transformers | ||
] | ||
extensions = meta.get("extensions", []) | ||
meta = dict( | ||
shape=meta["shape"] + sdshape, | ||
|
@@ -532,6 +578,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: | |
meta["compressor"] = compressor | ||
if dimension_separator: | ||
meta["dimension_separator"] = dimension_separator | ||
if storage_transformers: | ||
meta["storage_transformers"] = storage_transformers | ||
return json_dumps(meta) | ||
|
||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.