1
1
import pickle
2
2
from typing import Any
3
+ from unittest.mock import AsyncMock
3
4
4
5
import numpy as np
5
6
import numpy.typing as npt
9
10
import zarr.api
10
11
import zarr.api.asynchronous
11
12
from zarr import Array
12
- from zarr.abc.store import Store
13
+ from zarr.abc.store import RangeByteRequest, Store, SuffixByteRequest
13
14
from zarr.codecs import (
14
15
BloscCodec,
15
16
ShardingCodec,
@@ -249,17 +250,24 @@ def test_partial_shard_read_performance(store: Store) -> None:
249
250
250
251
@pytest.mark.parametrize("index_location", ["start", "end"])
251
252
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
253
+ @pytest.mark.parametrize("coalesce_reads", [True, False])
252
254
def test_sharding_multiple_chunks_partial_shard_read(
253
- store: Store, index_location: ShardingCodecIndexLocation
255
+ store: Store, index_location: ShardingCodecIndexLocation, coalesce_reads: bool
254
256
) -> None:
255
- array_shape = (8 , 64)
256
- shard_shape = (4 , 32)
257
+ array_shape = (16 , 64)
258
+ shard_shape = (8 , 32)
257
259
chunk_shape = (2, 4)
258
-
259
260
data = np.arange(np.prod(array_shape), dtype="float32").reshape(array_shape)
260
261
262
+ if coalesce_reads:
263
+ # 1MiB, enough to coalesce all chunks within a shard in this example
264
+ zarr.config.set({"sharding.read.coalesce_max_gap_bytes": 2**20})
265
+ else:
266
+ zarr.config.set({"sharding.read.coalesce_max_gap_bytes": -1}) # disable coalescing
267
+
268
+ store_mock = AsyncMock(wraps=store, spec=store.__class__)
261
269
a = zarr.create_array(
262
- StorePath(store ),
270
+ StorePath(store_mock ),
263
271
shape=data.shape,
264
272
chunks=chunk_shape,
265
273
shards={"shape": shard_shape, "index_location": index_location},
@@ -269,12 +277,41 @@ def test_sharding_multiple_chunks_partial_shard_read(
269
277
)
270
278
a[:] = data
271
279
272
- # Reads 2.5 (3 full, one partial) chunks each from 2 shards (a subset of both shards)
280
+ store_mock.reset_mock() # ignore store calls during array creation
281
+
282
+ # Reads 3 (2 full, 1 partial) chunks each from 2 shards (a subset of both shards)
283
+ # for a total of 6 chunks accessed
273
284
assert np.allclose(a[0, 22:42], np.arange(22, 42, dtype="float32"))
274
285
275
- # Reads 2 chunks from both shards along dimension 0
286
+ if coalesce_reads:
287
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
288
+ assert store_mock.get.call_count == 4
289
+ else:
290
+ # 2 shard index requests + 6 chunks
291
+ assert store_mock.get.call_count == 8
292
+
293
+ for method, args, kwargs in store_mock.method_calls:
294
+ assert method == "get"
295
+ assert args[0].startswith("c/") # get from a chunk
296
+ assert isinstance(kwargs["byte_range"], (SuffixByteRequest, RangeByteRequest))
297
+
298
+ store_mock.reset_mock()
299
+
300
+ # Reads 4 chunks from both shards along dimension 0 for a total of 8 chunks accessed
276
301
assert np.allclose(a[:, 0], np.arange(0, data.size, array_shape[1], dtype="float32"))
277
302
303
+ if coalesce_reads:
304
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
305
+ assert store_mock.get.call_count == 4
306
+ else:
307
+ # 2 shard index requests + 8 chunks
308
+ assert store_mock.get.call_count == 10
309
+
310
+ for method, args, kwargs in store_mock.method_calls:
311
+ assert method == "get"
312
+ assert args[0].startswith("c/") # get from a chunk
313
+ assert isinstance(kwargs["byte_range"], (SuffixByteRequest, RangeByteRequest))
314
+
278
315
279
316
@pytest.mark.parametrize(
280
317
"array_fixture",
0 commit comments