1
1
import pickle
2
2
from typing import Any
3
+ from unittest .mock import AsyncMock
3
4
4
5
import numpy as np
5
6
import numpy .typing as npt
9
10
import zarr .api
10
11
import zarr .api .asynchronous
11
12
from zarr import Array
12
- from zarr .abc .store import Store
13
+ from zarr .abc .store import RangeByteRequest , Store , SuffixByteRequest
13
14
from zarr .codecs import (
14
15
BloscCodec ,
15
16
ShardingCodec ,
@@ -249,17 +250,24 @@ def test_partial_shard_read_performance(store: Store) -> None:
249
250
250
251
@pytest .mark .parametrize ("index_location" , ["start" , "end" ])
251
252
@pytest .mark .parametrize ("store" , ["local" , "memory" , "zip" ], indirect = ["store" ])
253
+ @pytest .mark .parametrize ("coalesce_reads" , [True , False ])
252
254
def test_sharding_multiple_chunks_partial_shard_read (
253
- store : Store , index_location : ShardingCodecIndexLocation
255
+ store : Store , index_location : ShardingCodecIndexLocation , coalesce_reads : bool
254
256
) -> None :
255
- array_shape = (8 , 64 )
256
- shard_shape = (4 , 32 )
257
+ array_shape = (16 , 64 )
258
+ shard_shape = (8 , 32 )
257
259
chunk_shape = (2 , 4 )
258
-
259
260
data = np .arange (np .prod (array_shape ), dtype = "float32" ).reshape (array_shape )
260
261
262
+ if coalesce_reads :
263
+ # 1MiB, enough to coalesce all chunks within a shard in this example
264
+ zarr .config .set ({"sharding.read.coalesce_max_gap_bytes" : 2 ** 20 })
265
+ else :
266
+ zarr .config .set ({"sharding.read.coalesce_max_gap_bytes" : - 1 }) # disable coalescing
267
+
268
+ store_mock = AsyncMock (wraps = store , spec = store .__class__ )
261
269
a = zarr .create_array (
262
- StorePath (store ),
270
+ StorePath (store_mock ),
263
271
shape = data .shape ,
264
272
chunks = chunk_shape ,
265
273
shards = {"shape" : shard_shape , "index_location" : index_location },
@@ -269,12 +277,41 @@ def test_sharding_multiple_chunks_partial_shard_read(
269
277
)
270
278
a [:] = data
271
279
272
- # Reads 2.5 (3 full, one partial) chunks each from 2 shards (a subset of both shards)
280
+ store_mock .reset_mock () # ignore store calls during array creation
281
+
282
+ # Reads 3 (2 full, 1 partial) chunks each from 2 shards (a subset of both shards)
283
+ # for a total of 6 chunks accessed
273
284
assert np .allclose (a [0 , 22 :42 ], np .arange (22 , 42 , dtype = "float32" ))
274
285
275
- # Reads 2 chunks from both shards along dimension 0
286
+ if coalesce_reads :
287
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
288
+ assert store_mock .get .call_count == 4
289
+ else :
290
+ # 2 shard index requests + 6 chunks
291
+ assert store_mock .get .call_count == 8
292
+
293
+ for method , args , kwargs in store_mock .method_calls :
294
+ assert method == "get"
295
+ assert args [0 ].startswith ("c/" ) # get from a chunk
296
+ assert isinstance (kwargs ["byte_range" ], (SuffixByteRequest , RangeByteRequest ))
297
+
298
+ store_mock .reset_mock ()
299
+
300
+ # Reads 4 chunks from both shards along dimension 0 for a total of 8 chunks accessed
276
301
assert np .allclose (a [:, 0 ], np .arange (0 , data .size , array_shape [1 ], dtype = "float32" ))
277
302
303
+ if coalesce_reads :
304
+ # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
305
+ assert store_mock .get .call_count == 4
306
+ else :
307
+ # 2 shard index requests + 8 chunks
308
+ assert store_mock .get .call_count == 10
309
+
310
+ for method , args , kwargs in store_mock .method_calls :
311
+ assert method == "get"
312
+ assert args [0 ].startswith ("c/" ) # get from a chunk
313
+ assert isinstance (kwargs ["byte_range" ], (SuffixByteRequest , RangeByteRequest ))
314
+
278
315
279
316
@pytest .mark .parametrize (
280
317
"array_fixture" ,
0 commit comments