1
1
import pickle
2
2
from typing import Any
3
+ from unittest .mock import AsyncMock
3
4
4
5
import numpy as np
5
6
import numpy .typing as npt
9
10
import zarr .api
10
11
import zarr .api .asynchronous
11
12
from zarr import Array
12
- from zarr .abc .store import Store
13
+ from zarr .abc .store import RangeByteRequest , Store , SuffixByteRequest
13
14
from zarr .codecs import (
14
15
BloscCodec ,
15
16
ShardingCodec ,
@@ -264,17 +265,24 @@ async def get_with_latency(*args: Any, get_latency: float, **kwargs: Any) -> Any
264
265
265
266
@pytest .mark .parametrize ("index_location" , ["start" , "end" ])
266
267
@pytest .mark .parametrize ("store" , ["local" , "memory" , "zip" ], indirect = ["store" ])
268
+ @pytest .mark .parametrize ("coalesce_reads" , [True , False ])
267
269
def test_sharding_multiple_chunks_partial_shard_read (
268
- store : Store , index_location : ShardingCodecIndexLocation
270
+ store : Store , index_location : ShardingCodecIndexLocation , coalesce_reads : bool
269
271
) -> None :
270
- array_shape = (8 , 64 )
271
- shard_shape = (4 , 32 )
272
+ array_shape = (16 , 64 )
273
+ shard_shape = (8 , 32 )
272
274
chunk_shape = (2 , 4 )
273
-
274
275
data = np .arange (np .prod (array_shape ), dtype = "float32" ).reshape (array_shape )
275
276
277
+ if coalesce_reads :
278
+ # 1MiB, enough to coalesce all chunks within a shard in this example
279
+ zarr .config .set ({"sharding.read.coalesce_max_gap_bytes" : 2 ** 20 })
280
+ else :
281
+ zarr .config .set ({"sharding.read.coalesce_max_gap_bytes" : - 1 }) # disable coalescing
282
+
283
+ store_mock = AsyncMock (wraps = store , spec = store .__class__ )
276
284
a = zarr .create_array (
277
- StorePath (store ),
285
+ StorePath (store_mock ),
278
286
shape = data .shape ,
279
287
chunks = chunk_shape ,
280
288
shards = {"shape" : shard_shape , "index_location" : index_location },
@@ -284,12 +292,41 @@ def test_sharding_multiple_chunks_partial_shard_read(
284
292
)
285
293
a [:] = data
286
294
287
- # Reads 2.5 (3 full, one partial) chunks each from 2 shards (a subset of both shards)
295
+ store_mock .reset_mock () # ignore store calls during array creation
296
+
297
+ # Reads 3 (2 full, 1 partial) chunks each from 2 shards (a subset of both shards)
298
+ # for a total of 6 chunks accessed
288
299
assert np .allclose (a [0 , 22 :42 ], np .arange (22 , 42 , dtype = "float32" ))
289
300
290
- # Reads 2 chunks from both shards along dimension 0
301
+ if coalesce_reads :
302
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
303
+ assert store_mock .get .call_count == 4
304
+ else :
305
+ # 2 shard index requests + 6 chunks
306
+ assert store_mock .get .call_count == 8
307
+
308
+ for method , args , kwargs in store_mock .method_calls :
309
+ assert method == "get"
310
+ assert args [0 ].startswith ("c/" ) # get from a chunk
311
+ assert isinstance (kwargs ["byte_range" ], (SuffixByteRequest , RangeByteRequest ))
312
+
313
+ store_mock .reset_mock ()
314
+
315
+ # Reads 4 chunks from both shards along dimension 0 for a total of 8 chunks accessed
291
316
assert np .allclose (a [:, 0 ], np .arange (0 , data .size , array_shape [1 ], dtype = "float32" ))
292
317
318
+ if coalesce_reads :
319
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
320
+ assert store_mock .get .call_count == 4
321
+ else :
322
+ # 2 shard index requests + 8 chunks
323
+ assert store_mock .get .call_count == 10
324
+
325
+ for method , args , kwargs in store_mock .method_calls :
326
+ assert method == "get"
327
+ assert args [0 ].startswith ("c/" ) # get from a chunk
328
+ assert isinstance (kwargs ["byte_range" ], (SuffixByteRequest , RangeByteRequest ))
329
+
293
330
294
331
@pytest .mark .parametrize (
295
332
"array_fixture" ,
0 commit comments