Skip to content

LRU store cache #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions zarr/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@
class PermissionError(Exception):
pass

def OrderedDict_move_to_end(od, key):
od[key] = od.pop(key)


else: # pragma: py2 no cover

text_type = str
binary_type = bytes
from functools import reduce
PermissionError = PermissionError

def OrderedDict_move_to_end(od, key):
od.move_to_end(key)
173 changes: 153 additions & 20 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

"""
from __future__ import absolute_import, print_function, division
from collections import MutableMapping
from collections import MutableMapping, OrderedDict
import os
import tempfile
import zipfile
Expand All @@ -28,10 +28,10 @@
normalize_storage_path, buffer_size,
normalize_fill_value, nolock, normalize_dtype)
from zarr.meta import encode_array_metadata, encode_group_metadata
from zarr.compat import PY2, binary_type
from zarr.compat import PY2, binary_type, OrderedDict_move_to_end
from numcodecs.registry import codec_registry
from zarr.errors import (err_contains_group, err_contains_array, err_path_not_found,
err_bad_compressor, err_fspath_exists_notdir, err_read_only)
from zarr.errors import (err_contains_group, err_contains_array, err_bad_compressor,
err_fspath_exists_notdir, err_read_only)


array_meta_key = '.zarray'
Expand Down Expand Up @@ -143,18 +143,23 @@ def getsize(store, path=None):
return store.getsize(path)
elif isinstance(store, dict):
# compute from size of values
prefix = _path_to_prefix(path)
size = 0
for k in listdir(store, path):
try:
v = store[prefix + k]
except KeyError:
pass
else:
if path in store:
v = store[path]
size = buffer_size(v)
else:
members = listdir(store, path)
prefix = _path_to_prefix(path)
size = 0
for k in members:
try:
size += buffer_size(v)
except TypeError:
return -1
v = store[prefix + k]
except KeyError:
pass
else:
try:
size += buffer_size(v)
except TypeError:
return -1
return size
else:
return -1
Expand Down Expand Up @@ -610,16 +615,21 @@ def getsize(self, path=None):
path = normalize_storage_path(path)

# obtain value to return size of
value = self.root
value = None
if path:
try:
parent, key = self._get_parent(path)
value = parent[key]
except KeyError:
err_path_not_found(path)
pass
else:
value = self.root

# obtain size of value
if isinstance(value, self.cls):
if value is None:
return 0

elif isinstance(value, self.cls):
# total size for directory
size = 0
for v in value.values():
Expand All @@ -629,6 +639,7 @@ def getsize(self, path=None):
except TypeError:
return -1
return size

else:
try:
return buffer_size(value)
Expand Down Expand Up @@ -843,7 +854,7 @@ def getsize(self, path=None):
size += os.path.getsize(child_fs_path)
return size
else:
err_path_not_found(path)
return 0

def clear(self):
shutil.rmtree(self.path)
Expand Down Expand Up @@ -1230,7 +1241,7 @@ def getsize(self, path=None):
info = self.zf.getinfo(path)
return info.compress_size
except KeyError:
err_path_not_found(path)
return 0
else:
return 0

Expand Down Expand Up @@ -1676,3 +1687,125 @@ def __iter__(self):

def __len__(self):
return self.db.stat()['entries']


class LRUStoreCache(MutableMapping):

def __init__(self, store, max_size):
self._store = store
self._max_size = max_size
self._current_size = 0
self._keys_cache = None
self._listdir_cache = dict()
self._values_cache = OrderedDict()
self._mutex = Lock()
self.hits = self.misses = 0

def __getstate__(self):
return (self._store, self._max_size, self._current_size, self._keys_cache,
self._listdir_cache, self._values_cache, self.hits, self.misses)

def __setstate__(self, state):
(self._store, self._max_size, self._current_size, self._keys_cache,
self._listdir_cache, self._values_cache, self.hits, self.misses) = state
self._mutex = Lock()

def __len__(self):
return len(self._store)

def __iter__(self):
return self.keys()

def keys(self):
with self._mutex:
if self._keys_cache is None:
self._keys_cache = list(self._store.keys())
return iter(self._keys_cache)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably move this under the mutex lock.


def listdir(self, path=None):
with self._mutex:
try:
return self._listdir_cache[path]
except KeyError:
listing = listdir(self._store, path)
self._listdir_cache[path] = listing
return listing

def getsize(self, path=None):
return getsize(self._store, path=path)

def _pop_value(self):
# remove the first value from the cache, as this will be the least recently
# used value
_, v = self._values_cache.popitem(last=False)
return v

def _accommodate_value(self, value_size):
if self._max_size is None:
return
# ensure there is enough space in the cache for a new value
while self._current_size + value_size > self._max_size:
v = self._pop_value()
self._current_size -= buffer_size(v)

def _cache_value(self, key, value):
# cache a value
value_size = buffer_size(value)
# check size of the value against max size, as if the value itself exceeds max
# size then we are never going to cache it
if self._max_size is None or value_size <= self._max_size:
self._accommodate_value(value_size)
self._values_cache[key] = value
self._current_size += value_size

def clear_values(self):
with self._mutex:
self._values_cache.clear()

def clear_keys(self):
with self._mutex:
self._clear_keys()

def _clear_keys(self):
self._keys_cache = None
self._listdir_cache.clear()

def _clear_value(self, key):
if key in self._values_cache:
value = self._values_cache.pop(key)
self._current_size -= buffer_size(value)

def __getitem__(self, key):
try:
# first try to obtain the value from the cache
with self._mutex:
value = self._values_cache[key]
# cache hit if no KeyError is raised
self.hits += 1
# treat the end as most recently used
OrderedDict_move_to_end(self._values_cache, key)

except KeyError:
# cache miss, retrieve value from the store
value = self._store[key]
with self._mutex:
self.misses += 1
# need to check if key is not in the cache, as it may have been cached
# while we were retrieving the value from the store
if key not in self._values_cache:
self._cache_value(key, value)

return value

def __setitem__(self, key, value):
self._store[key] = value
with self._mutex:
self._clear_keys()
self._clear_value(key)
self._cache_value(key, value)

def __delitem__(self, key):
del self._store[key]
with self._mutex:
self._clear_keys()
self._clear_value(key)
13 changes: 12 additions & 1 deletion zarr/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@


from zarr.storage import (DirectoryStore, init_array, init_group, NestedDirectoryStore,
DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob)
DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob,
LRUStoreCache)
from zarr.core import Array
from zarr.errors import PermissionError
from zarr.compat import PY2, text_type, binary_type
Expand Down Expand Up @@ -1615,3 +1616,13 @@ def test_cache_metadata(self):
def test_object_arrays_danger(self):
# skip this one as it only works if metadata are cached
pass


class TestArrayWithStoreCache(TestArray):

@staticmethod
def create_array(read_only=False, **kwargs):
store = LRUStoreCache(dict(), max_size=None)
kwargs.setdefault('compressor', Zlib(level=1))
init_array(store, **kwargs)
return Array(store, read_only=read_only)
11 changes: 10 additions & 1 deletion zarr/tests/test_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

from zarr.storage import (DictStore, DirectoryStore, ZipStore, init_group, init_array,
array_meta_key, group_meta_key, atexit_rmtree,
NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmglob)
NestedDirectoryStore, DBMStore, LMDBStore, atexit_rmglob,
LRUStoreCache)
from zarr.core import Array
from zarr.compat import PY2, text_type
from zarr.hierarchy import Group, group, open_group
Expand Down Expand Up @@ -945,6 +946,14 @@ def test_chunk_store(self):
eq(expect, actual)


class TestGroupWithStoreCache(TestGroup):

@staticmethod
def create_store():
store = LRUStoreCache(dict(), max_size=None)
return store, None


def test_group():
# test the group() convenience function

Expand Down
Loading