15
15
*/
16
16
package org .springframework .data .redis .connection ;
17
17
18
+ import lombok .AccessLevel ;
19
+ import lombok .EqualsAndHashCode ;
20
+ import lombok .Getter ;
21
+ import lombok .RequiredArgsConstructor ;
22
+
18
23
import java .util .*;
19
24
import java .util .Map .Entry ;
20
25
import java .util .concurrent .ExecutionException ;
21
26
import java .util .concurrent .Future ;
27
+ import java .util .stream .Collectors ;
22
28
23
29
import org .springframework .beans .factory .DisposableBean ;
24
30
import org .springframework .core .task .AsyncTaskExecutor ;
@@ -202,7 +208,6 @@ public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallba
202
208
203
209
Map <NodeExecution , Future <NodeResult <T >>> futures = new LinkedHashMap <>();
204
210
for (RedisClusterNode node : resolvedRedisClusterNodes ) {
205
-
206
211
futures .put (new NodeExecution (node ), executor .submit (() -> executeCommandOnSingleNode (callback , node )));
207
212
}
208
213
@@ -225,23 +230,30 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
225
230
if (!entry .getValue ().isDone () && !entry .getValue ().isCancelled ()) {
226
231
done = false ;
227
232
} else {
233
+
234
+ NodeExecution execution = entry .getKey ();
228
235
try {
229
236
230
237
String futureId = ObjectUtils .getIdentityHexString (entry .getValue ());
231
238
if (!saveGuard .contains (futureId )) {
232
- result .add (entry .getValue ().get ());
239
+
240
+ if (execution .isPositional ()) {
241
+ result .add (execution .getPositionalKey (), entry .getValue ().get ());
242
+ } else {
243
+ result .add (entry .getValue ().get ());
244
+ }
233
245
saveGuard .add (futureId );
234
246
}
235
247
} catch (ExecutionException e ) {
236
248
237
249
RuntimeException ex = convertToDataAccessException ((Exception ) e .getCause ());
238
- exceptions .put (entry . getKey () .getNode (), ex != null ? ex : e .getCause ());
250
+ exceptions .put (execution .getNode (), ex != null ? ex : e .getCause ());
239
251
} catch (InterruptedException e ) {
240
252
241
253
Thread .currentThread ().interrupt ();
242
254
243
255
RuntimeException ex = convertToDataAccessException ((Exception ) e .getCause ());
244
- exceptions .put (entry . getKey () .getNode (), ex != null ? ex : e .getCause ());
256
+ exceptions .put (execution .getNode (), ex != null ? ex : e .getCause ());
245
257
break ;
246
258
}
247
259
}
@@ -271,29 +283,28 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
271
283
public <S , T > MultiNodeResult <T > executeMultiKeyCommand (MultiKeyClusterCommandCallback <S , T > cmd ,
272
284
Iterable <byte []> keys ) {
273
285
274
- Map <RedisClusterNode , Set < byte []> > nodeKeyMap = new HashMap <>();
286
+ Map <RedisClusterNode , PositionalKeys > nodeKeyMap = new HashMap <>();
275
287
288
+ int index = 0 ;
276
289
for (byte [] key : keys ) {
277
290
for (RedisClusterNode node : getClusterTopology ().getKeyServingNodes (key )) {
278
291
279
292
if (nodeKeyMap .containsKey (node )) {
280
- nodeKeyMap .get (node ).add ( key );
293
+ nodeKeyMap .get (node ).append ( PositionalKey . of ( key , index ++) );
281
294
} else {
282
- Set <byte []> keySet = new LinkedHashSet <>();
283
- keySet .add (key );
284
- nodeKeyMap .put (node , keySet );
295
+ nodeKeyMap .put (node , PositionalKeys .of (PositionalKey .of (key , index ++)));
285
296
}
286
297
}
287
298
}
288
299
289
300
Map <NodeExecution , Future <NodeResult <T >>> futures = new LinkedHashMap <>();
290
301
291
- for (Entry <RedisClusterNode , Set < byte []> > entry : nodeKeyMap .entrySet ()) {
302
+ for (Entry <RedisClusterNode , PositionalKeys > entry : nodeKeyMap .entrySet ()) {
292
303
293
304
if (entry .getKey ().isMaster ()) {
294
- for (byte [] key : entry .getValue ()) {
305
+ for (PositionalKey key : entry .getValue ()) {
295
306
futures .put (new NodeExecution (entry .getKey (), key ),
296
- executor .submit (() -> executeMultiKeyCommandOnSingleNode (cmd , entry .getKey (), key )));
307
+ executor .submit (() -> executeMultiKeyCommandOnSingleNode (cmd , entry .getKey (), key . getBytes () )));
297
308
}
298
309
}
299
310
}
@@ -326,6 +337,7 @@ private ClusterTopology getClusterTopology() {
326
337
return this .topologyProvider .getTopology ();
327
338
}
328
339
340
+ @ Nullable
329
341
private DataAccessException convertToDataAccessException (Exception e ) {
330
342
return exceptionTranslationStrategy .translate (e );
331
343
}
@@ -384,17 +396,22 @@ public interface MultiKeyClusterCommandCallback<T, S> {
384
396
* keys, involved.
385
397
*
386
398
* @author Christoph Strobl
399
+ * @author Mark Paluch
387
400
* @since 1.7
388
401
*/
389
402
private static class NodeExecution {
390
403
391
- private RedisClusterNode node ;
392
- private Object [] args ;
404
+ private final RedisClusterNode node ;
405
+ private final @ Nullable PositionalKey positionalKey ;
406
+
407
+ NodeExecution (RedisClusterNode node ) {
408
+ this (node , null );
409
+ }
393
410
394
- NodeExecution (RedisClusterNode node , Object ... args ) {
411
+ NodeExecution (RedisClusterNode node , @ Nullable PositionalKey positionalKey ) {
395
412
396
413
this .node = node ;
397
- this .args = args ;
414
+ this .positionalKey = positionalKey ;
398
415
}
399
416
400
417
/**
@@ -404,45 +421,18 @@ RedisClusterNode getNode() {
404
421
return node ;
405
422
}
406
423
407
- /*
408
- * (non-Javadoc)
409
- * @see java.lang.Object#hashCode()
424
+ /**
425
+ * Get the {@link PositionalKey} of this execution.
426
+ *
427
+ * @since 2.0.3
410
428
*/
411
- @ Override
412
- public int hashCode () {
413
-
414
- int result = ObjectUtils .nullSafeHashCode (node );
415
- return result + ObjectUtils .nullSafeHashCode (args );
429
+ PositionalKey getPositionalKey () {
430
+ return positionalKey ;
416
431
}
417
432
418
- /*
419
- * (non-Javadoc)
420
- * @see java.lang.Object#equals(java.lang.Object)
421
- */
422
- @ Override
423
- public boolean equals (Object obj ) {
424
-
425
- if (this == obj ) {
426
- return true ;
427
- }
428
-
429
- if (obj == null ) {
430
- return false ;
431
- }
432
-
433
- if (!(obj instanceof NodeExecution )) {
434
- return false ;
435
- }
436
-
437
- NodeExecution that = (NodeExecution ) obj ;
438
-
439
- if (!ObjectUtils .nullSafeEquals (this .node , that .node )) {
440
- return false ;
441
- }
442
-
443
- return ObjectUtils .nullSafeEquals (this .args , that .args );
433
+ boolean isPositional () {
434
+ return positionalKey != null ;
444
435
}
445
-
446
436
}
447
437
448
438
/**
@@ -515,17 +505,25 @@ public byte[] getKey() {
515
505
* {@link MultiNodeResult} holds all {@link NodeResult} of a command executed on multiple {@link RedisClusterNode}.
516
506
*
517
507
* @author Christoph Strobl
508
+ * @author Mark Paluch
518
509
* @param <T>
519
510
* @since 1.7
520
511
*/
521
512
public static class MultiNodeResult <T > {
522
513
523
514
List <NodeResult <T >> nodeResults = new ArrayList <>();
515
+ Map <PositionalKey , NodeResult <T >> positionalResults = new LinkedHashMap <>();
524
516
525
517
private void add (NodeResult <T > result ) {
526
518
nodeResults .add (result );
527
519
}
528
520
521
+ private void add (PositionalKey key , NodeResult <T > result ) {
522
+
523
+ positionalResults .put (key , result );
524
+ add (result );
525
+ }
526
+
529
527
/**
530
528
* @return never {@literal null}.
531
529
*/
@@ -551,15 +549,23 @@ public List<T> resultsAsList() {
551
549
*/
552
550
public List <T > resultsAsListSortBy (byte []... keys ) {
553
551
554
- ArrayList <NodeResult <T >> clone = new ArrayList <>(nodeResults );
555
- clone .sort (new ResultByReferenceKeyPositionComparator (keys ));
552
+ if (positionalResults .isEmpty ()) {
553
+
554
+ List <NodeResult <T >> clone = new ArrayList <>(nodeResults );
555
+ clone .sort (new ResultByReferenceKeyPositionComparator (keys ));
556
+
557
+ return toList (clone );
558
+ }
559
+
560
+ Map <PositionalKey , NodeResult <T >> result = new TreeMap <>(new ResultByKeyPositionComparator (keys ));
561
+ result .putAll (positionalResults );
556
562
557
- return toList (clone );
563
+ return result . values (). stream (). map ( tNodeResult -> tNodeResult . value ). collect ( Collectors . toList () );
558
564
}
559
565
560
566
/**
561
567
* @param returnValue can be {@literal null}.
562
- * @return can be {@litearl null}.
568
+ * @return can be {@literal null}.
563
569
*/
564
570
@ Nullable
565
571
public T getFirstNonNullNotEmptyOrDefault (@ Nullable T returnValue ) {
@@ -609,5 +615,114 @@ public int compare(NodeResult<?> o1, NodeResult<?> o2) {
609
615
return Integer .compare (reference .indexOf (o1 .key ), reference .indexOf (o2 .key ));
610
616
}
611
617
}
618
+
619
+ /**
620
+ * {@link Comparator} for sorting {@link PositionalKey} by external {@link PositionalKeys}.
621
+ *
622
+ * @author Mark Paluch
623
+ * @since 2.0.3
624
+ */
625
+ private static class ResultByKeyPositionComparator implements Comparator <PositionalKey > {
626
+
627
+ PositionalKeys reference ;
628
+
629
+ ResultByKeyPositionComparator (byte []... keys ) {
630
+ reference = PositionalKeys .of (keys );
631
+ }
632
+
633
+ @ Override
634
+ public int compare (PositionalKey o1 , PositionalKey o2 ) {
635
+ return Integer .compare (reference .indexOf (o1 ), reference .indexOf (o2 ));
636
+ }
637
+ }
638
+ }
639
+
640
+ /**
641
+ * Value object representing a Redis key at a particular command position.
642
+ *
643
+ * @author Mark Paluch
644
+ * @since 2.0.3
645
+ */
646
+ @ Getter
647
+ @ EqualsAndHashCode
648
+ @ RequiredArgsConstructor (access = AccessLevel .PRIVATE )
649
+ static class PositionalKey {
650
+
651
+ private final ByteArrayWrapper key ;
652
+ private final int position ;
653
+
654
+ public static PositionalKey of (byte [] key , int index ) {
655
+ return new PositionalKey (new ByteArrayWrapper (key ), index );
656
+ }
657
+
658
+ /**
659
+ * @return binary key.
660
+ */
661
+ public byte [] getBytes () {
662
+ return key .getArray ();
663
+ }
664
+ }
665
+
666
+ /**
667
+ * Mutable data structure to represent multiple {@link PositionalKey}s.
668
+ *
669
+ * @author Mark Paluch
670
+ * @since 2.0.3
671
+ */
672
+ @ RequiredArgsConstructor (access = AccessLevel .PRIVATE )
673
+ static class PositionalKeys implements Iterable <PositionalKey > {
674
+
675
+ private final List <PositionalKey > keys ;
676
+
677
+ /**
678
+ * Create an empty {@link PositionalKeys}.
679
+ */
680
+ public static PositionalKeys empty () {
681
+ return new PositionalKeys (new ArrayList <>());
682
+ }
683
+
684
+ /**
685
+ * Create an {@link PositionalKeys} from {@code keys}.
686
+ */
687
+ public static PositionalKeys of (byte []... keys ) {
688
+
689
+ List <PositionalKey > result = new ArrayList <>(keys .length );
690
+
691
+ for (int i = 0 ; i < keys .length ; i ++) {
692
+ result .add (PositionalKey .of (keys [i ], i ));
693
+ }
694
+
695
+ return new PositionalKeys (result );
696
+ }
697
+
698
+ /**
699
+ * Create an {@link PositionalKeys} from {@link PositionalKey}s.
700
+ */
701
+ public static PositionalKeys of (PositionalKey ... keys ) {
702
+
703
+ PositionalKeys result = PositionalKeys .empty ();
704
+ result .append (keys );
705
+
706
+ return result ;
707
+ }
708
+
709
+ /**
710
+ * Append {@link PositionalKey}s to this object.
711
+ */
712
+ public void append (PositionalKey ... keys ) {
713
+ this .keys .addAll (Arrays .asList (keys ));
714
+ }
715
+
716
+ /**
717
+ * @return index of the {@link PositionalKey}.
718
+ */
719
+ public int indexOf (PositionalKey key ) {
720
+ return keys .indexOf (key );
721
+ }
722
+
723
+ @ Override
724
+ public Iterator <PositionalKey > iterator () {
725
+ return keys .iterator ();
726
+ }
612
727
}
613
728
}
0 commit comments