Skip to content

Commit 1637a97

Browse files
authored
Fit cached FSs better with async backends (#1429)
* Fit cached FSs better with async backends * Add test
1 parent 2942485 commit 1637a97

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

fsspec/implementations/cached.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ def __getattribute__(self, item):
393393
"open",
394394
"cat",
395395
"cat_file",
396+
"cat_ranges",
396397
"get",
397398
"read_block",
398399
"tail",
@@ -715,6 +716,17 @@ def save_cache(self):
715716
def load_cache(self):
716717
pass
717718

719+
def cat_ranges(
720+
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
721+
):
722+
lpaths = [self._check_file(p) for p in paths]
723+
rpaths = [p for l, p in zip(lpaths, paths) if l is False]
724+
lpaths = [l for l, p in zip(lpaths, paths) if l is False]
725+
self.fs.get(rpaths, lpaths)
726+
return super().cat_ranges(
727+
paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
728+
)
729+
718730
def _open(self, path, mode="rb", **kwargs):
719731
path = self._strip_protocol(path)
720732

fsspec/implementations/reference.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ def __init__(
624624
**(ref_storage_args or target_options or {}), protocol=target_protocol
625625
)
626626
ref_fs, fo2 = fsspec.core.url_to_fs(fo, **dic)
627-
if ref_fs.isfile(fo):
627+
if ref_fs.isfile(fo2):
628628
# text JSON
629629
with fsspec.open(fo, "rb", **dic) as f:
630630
logger.info("Read reference from URL %s", fo)

fsspec/implementations/tests/test_reference.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,3 +621,30 @@ def test_mapping_getitems(m):
621621
fs = fsspec.filesystem("reference", fo=refs, fs=h)
622622
mapping = fs.get_mapper("")
623623
assert mapping.getitems(["b", "a"]) == {"a": b"A", "b": b"B"}
624+
625+
626+
def test_cached(m, tmpdir):
627+
fn = f"{tmpdir}/ref.json"
628+
629+
m.pipe({"a": b"A", "b": b"B"})
630+
m.pipe("ref.json", b"""{"a": ["a"], "b": ["b"]}""")
631+
632+
fs = fsspec.filesystem(
633+
"reference",
634+
fo="simplecache::memory://ref.json",
635+
fs=m,
636+
target_options={"cache_storage": str(tmpdir), "same_names": True},
637+
)
638+
assert fs.cat("a") == b"A"
639+
assert os.path.exists(fn)
640+
641+
# truncate original file to show we are loading from the cached version
642+
m.pipe("ref.json", b"")
643+
fs = fsspec.filesystem(
644+
"reference",
645+
fo="simplecache::memory://ref.json",
646+
fs=m,
647+
target_options={"cache_storage": str(tmpdir), "same_names": True},
648+
skip_instance_cache=True,
649+
)
650+
assert fs.cat("a") == b"A"

0 commit comments

Comments
 (0)