Skip to content

Sharding storage transformer for v3 #1111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
605620b
add storage_transformers and get/set_partial_values
jstriebel Jul 28, 2022
566e4b0
formatting
jstriebel Jul 28, 2022
5f85439
add docs and release notes
jstriebel Jul 28, 2022
3c38d57
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jul 28, 2022
dd7fedb
add test_core testcase
jstriebel Jul 29, 2022
e33b365
Update zarr/creation.py
jstriebel Jul 29, 2022
81ebf68
apply PR feedback
jstriebel Jul 29, 2022
ca28471
add comment that storage_transformers=None is the same as storage_tra…
jstriebel Jul 29, 2022
85f3309
use empty tuple as default for storage_transformers
jstriebel Aug 1, 2022
03de894
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 1, 2022
41eaafb
make mypy happy
jstriebel Aug 1, 2022
5d7be76
better coverage, minor fix, adding rmdir
jstriebel Aug 1, 2022
46229ad
add missing rmdir to test
jstriebel Aug 1, 2022
3a9f7cc
increase coverage
jstriebel Aug 2, 2022
efa4e07
improve test coverage
jstriebel Aug 3, 2022
b4668a8
fix TestArrayWithStorageTransformersV3
jstriebel Aug 3, 2022
e4a4853
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 5, 2022
e454046
Update zarr/creation.py
jstriebel Aug 8, 2022
a3c7f74
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 8, 2022
92ce212
add sharding storage transformer
jstriebel Aug 18, 2022
f6c87b4
add actual transformer
jstriebel Aug 18, 2022
df2dd71
fixe, and allow partial reads for uncompressed v3 arrays
jstriebel Aug 22, 2022
c041dd8
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 22, 2022
06ce675
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Aug 22, 2022
696d5ca
pick generic storage transformer changes from #1111
jstriebel Aug 22, 2022
4c0807e
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Aug 22, 2022
c099440
increase coverage
jstriebel Aug 22, 2022
61db74a
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Aug 22, 2022
83c9389
make lgtm happy
jstriebel Aug 22, 2022
fde61e8
add release note
jstriebel Aug 22, 2022
de4de18
better coverage
jstriebel Aug 23, 2022
0deb2b6
fix hexdigest
jstriebel Aug 23, 2022
d3eda71
improve tests
jstriebel Aug 23, 2022
093926c
fix order of storage transformers
jstriebel Aug 24, 2022
be98c01
fix order of storage transformers
jstriebel Aug 24, 2022
6e2790c
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Aug 24, 2022
7c2767a
retrigger CI
jstriebel Aug 25, 2022
9257b85
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Aug 25, 2022
e7b14b7
minor test improvement
jstriebel Aug 25, 2022
a52300c
minor test update
jstriebel Aug 25, 2022
a960481
apply PR feedback
jstriebel Sep 8, 2022
146c30a
Merge remote-tracking branch 'origin/main' into storage-transformers-…
jstriebel Dec 12, 2022
6bc1025
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Dec 12, 2022
59cca8b
minor fixes
jstriebel Dec 12, 2022
92a48d8
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Dec 12, 2022
c2dc0d6
make flake8 happy
jstriebel Dec 12, 2022
12dc1ae
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Dec 12, 2022
91f10ff
call ensure_bytes in sharding transformer
jstriebel Dec 12, 2022
73fb0a5
minor fixes
jstriebel Dec 12, 2022
7402262
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 19, 2022
b9d8177
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 21, 2022
91f0c2c
apply PR feedback
jstriebel Dec 22, 2022
e68c97f
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 22, 2022
490b962
Merge branch 'storage-transformers-and-partial-get-set' into sharding…
jstriebel Dec 22, 2022
e1960a1
adapt to supports_efficient_get_partial_values property
jstriebel Dec 22, 2022
c1bc26d
add ZARR_V3_SHARDING flag for sharding usage
jstriebel Dec 22, 2022
6f5b35a
fix release notes
jstriebel Dec 22, 2022
070c02c
fix release notes
jstriebel Dec 22, 2022
ef5c020
Merge remote-tracking branch 'scm/storage-transformers-and-partial-ge…
jstriebel Dec 22, 2022
a7e4d89
Merge remote-tracking branch 'origin/main' into storage-transformers-…
joshmoore Jan 16, 2023
fcb9ba0
Merge pull request #3 from joshmoore/storage-transformers-and-partial…
jstriebel Jan 16, 2023
b6588e7
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
eba9006
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
652653d
Merge remote-tracking branch 'scm/storage-transformers-and-partial-ge…
jstriebel Jan 19, 2023
1ccf052
Merge remote-tracking branch 'origin/main' into sharding-storage-tran…
jstriebel Jan 19, 2023
8bb79ef
Merge branch 'main' into sharding-storage-transformer
jstriebel Jan 23, 2023
a2eb332
Merge branch 'main' into sharding-storage-transformer
jstriebel Jan 26, 2023
dbf9fff
Merge branch 'main' into sharding-storage-transformer
jstriebel Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ Bug fixes
* Fix bug in LRUEStoreCache in which the current size wasn't reset on invalidation.
By :user:`BGCMHou <BGCMHou>` and :user:`Josh Moore <joshmoore>` :issue:`1076`, :issue:`1077`.

Enhancements
~~~~~~~~~~~~

* **Improve Zarr V3 support, adding partial store read/write, storage transformers and sharding.**
Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html):
* `get_partial_values` and `set_partial_values`
* efficient `get_partial_values` implementation for `FSStoreV3`
* storage transformers interface
* sharding storage transformer
By :user:`Jonathan Striebel <jstriebel>`; :issue:`1096`, :issue:`1111`.


Documentation
~~~~~~~~~~~~~

Expand Down
214 changes: 214 additions & 0 deletions zarr/_storage/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
import os
from collections import defaultdict
from collections.abc import MutableMapping
from copy import copy
from string import ascii_letters, digits
from typing import Any, List, Mapping, Optional, Union

Expand Down Expand Up @@ -254,6 +256,74 @@ def __setitem__(self, key, value):
def __getitem__(self, key):
"""Get a value."""

@abc.abstractmethod
def rmdir(self, path=None):
"""Remove a data path and all its subkeys and related metadata.
Expects a path without the data or meta root prefix."""

def supports_efficient_get_partial_values(self):
return False

def get_partial_values(self, key_ranges):
"""Get multiple partial values.
key_ranges can be an iterable of key, range pairs,
where a range specifies two integers range_start and range_length
as a tuple, (range_start, range_length).
range_length may be None to indicate to read until the end.
range_start may be negative to start reading range_start bytes
from the end of the file.
A key may occur multiple times with different ranges.
Inserts None for missing keys into the returned list."""
results = [None] * len(key_ranges)
indexed_ranges_by_key = defaultdict(list)
for i, (key, range_) in enumerate(key_ranges):
indexed_ranges_by_key[key].append((i, range_))
for key, indexed_ranges in indexed_ranges_by_key.items():
try:
value = self[key]
except KeyError: # pragma: no cover
continue
for i, (range_from, range_length) in indexed_ranges:
if range_length is None:
results[i] = value[range_from:]
else:
results[i] = value[range_from:range_from + range_length]
return results

def supports_efficient_set_partial_values(self):
return False

def set_partial_values(self, key_start_values):
"""Set multiple partial values.
key_start_values can be an iterable of key, start and value triplets
as tuples, (key, start, value), where start defines the offset in bytes.
A key may occur multiple times with different starts and non-overlapping values.
Also, start may only be beyond the current value if other values fill the gap.
start may be negative to start writing start bytes from the current
end of the file, ending the file with the new value."""
unique_keys = set(next(zip(*key_start_values)))
values = {}
for key in unique_keys:
old_value = self.get(key)
values[key] = None if old_value is None else bytearray(old_value)
for key, start, value in key_start_values:
if values[key] is None:
assert start == 0
values[key] = value
else:
if start > len(values[key]): # pragma: no cover
raise ValueError(
f"Cannot set value at start {start}, "
+ f"since it is beyond the data at key {key}, "
+ f"having length {len(values[key])}."
)
if start < 0:
values[key][start:] = value
else:
values[key][start:start + len(value)] = value
for key, value in values.items():
self[key] = value

def clear(self):
"""Remove all items from store."""
self.erase_prefix("/")
Expand Down Expand Up @@ -303,6 +373,150 @@ def _ensure_store(store):
)


class StorageTransformer(MutableMapping, abc.ABC):
"""Base class for storage transformers. The methods simply pass on the data as-is
and should be overwritten by sub-classes."""

_store_version = 3
_metadata_class = Metadata3

def __init__(self, _type) -> None:
if _type not in self.valid_types: # pragma: no cover
raise ValueError(
f"Storage transformer cannot be initialized with type {_type}, "
+ f"must be one of {list(self.valid_types)}."
)
self.type = _type
self._inner_store = None

def _copy_for_array(self, array, inner_store):
transformer_copy = copy(self)
transformer_copy._inner_store = inner_store
return transformer_copy

@abc.abstractproperty
def extension_uri(self):
pass # pragma: no cover

@abc.abstractproperty
def valid_types(self):
pass # pragma: no cover

def get_config(self):
"""Return a dictionary holding configuration parameters for this
storage transformer. All values must be compatible with JSON encoding."""
# Override in sub-class if need special encoding of config values.
# By default, assume all non-private members are configuration
# parameters except for type .
return {
k: v for k, v in self.__dict__.items()
if not k.startswith('_') and k != "type"
}

@classmethod
def from_config(cls, _type, config):
"""Instantiate storage transformer from a configuration object."""
# override in sub-class if need special decoding of config values

# by default, assume constructor accepts configuration parameters as
# keyword arguments without any special decoding
return cls(_type, **config)

@property
def inner_store(self) -> Union["StorageTransformer", StoreV3]:
assert self._inner_store is not None, (
"inner_store is not initialized, first get a copy via _copy_for_array."
)
return self._inner_store

# The following implementations are usually fine to keep as-is:

def __eq__(self, other):
return (
type(self) == type(other) and
self._inner_store == other._inner_store and
self.get_config() == other.get_config()
)

def erase(self, key):
self.__delitem__(key)

def list(self):
return list(self.keys())

def list_dir(self, prefix):
return StoreV3.list_dir(self, prefix)

def is_readable(self):
return self.inner_store.is_readable()

def is_writeable(self):
return self.inner_store.is_writeable()

def is_listable(self):
return self.inner_store.is_listable()

def is_erasable(self):
return self.inner_store.is_erasable()

def clear(self):
return self.inner_store.clear()

def __enter__(self):
return self.inner_store.__enter__()

def __exit__(self, exc_type, exc_value, traceback):
return self.inner_store.__exit__(exc_type, exc_value, traceback)

def close(self) -> None:
return self.inner_store.close()

# The following implementations might need to be re-implemented
# by subclasses implementing storage transformers:

def rename(self, src_path: str, dst_path: str) -> None:
return self.inner_store.rename(src_path, dst_path)

def list_prefix(self, prefix):
return self.inner_store.list_prefix(prefix)

def erase_prefix(self, prefix):
return self.inner_store.erase_prefix(prefix)

def rmdir(self, path=None):
return self.inner_store.rmdir(path)

def __contains__(self, key):
return self.inner_store.__contains__(key)

def __setitem__(self, key, value):
return self.inner_store.__setitem__(key, value)

def __getitem__(self, key):
return self.inner_store.__getitem__(key)

def __delitem__(self, key):
return self.inner_store.__delitem__(key)

def __iter__(self):
return self.inner_store.__iter__()

def __len__(self):
return self.inner_store.__len__()

def supports_efficient_get_partial_values(self):
return self.inner_store.supports_efficient_get_partial_values()

def get_partial_values(self, key_ranges):
return self.inner_store.get_partial_values(key_ranges)

def supports_efficient_set_partial_values(self):
return self.inner_store.supports_efficient_set_partial_values()

def set_partial_values(self, key_start_values):
return self.inner_store.set_partial_values(key_start_values)


# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]

Expand Down
31 changes: 31 additions & 0 deletions zarr/_storage/v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,37 @@ def rmdir(self, path=None):
if self.fs.isdir(store_path):
self.fs.rm(store_path, recursive=True)

def supports_efficient_get_partial_values(self):
return True

def get_partial_values(self, key_ranges):
"""Get multiple partial values.
key_ranges can be an iterable of key, range pairs,
where a range specifies two integers range_start and range_length
as a tuple, (range_start, range_length).
range_length may be None to indicate to read until the end.
range_start may be negative to start reading range_start bytes
from the end of the file.
A key may occur multiple times with different ranges.
Inserts None for missing keys into the returned list."""
results = []
for key, (range_start, range_length) in key_ranges:
key = self._normalize_key(key)
path = self.dir_path(key)
try:
if range_start < 0:
if range_length is None:
result = self.fs.tail(path, size=-range_start)
else:
size = self.fs.size(path)
result = self.fs.read_block(path, size + range_start, range_length)
else:
result = self.fs.read_block(path, range_start, range_length)
except self.map.missing_exceptions:
result = None
results.append(result)
return results


class MemoryStoreV3(MemoryStore, StoreV3):

Expand Down
Loading