Skip to content

Commit e9bfc2b

Browse files
authored
Merge pull request #158 from graphql-java/reactive-streams-branch-extra-tests-for-reactive
More tests for Publishers on reactive branch
2 parents 5d826b8 + 8b344db commit e9bfc2b

9 files changed

+586
-233
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

+45-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.dataloader.annotations.GuardedBy;
44
import org.dataloader.annotations.Internal;
55
import org.dataloader.impl.CompletableFutureKit;
6+
import org.dataloader.impl.DataLoaderAssertionException;
67
import org.dataloader.scheduler.BatchLoaderScheduler;
78
import org.dataloader.stats.StatisticsCollector;
89
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
@@ -624,6 +625,15 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
624625
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
625626
}
626627

628+
/**********************************************************************************************
629+
* ********************************************************************************************
630+
* <p>
631+
* The reactive support classes start here
632+
*
633+
* @param <T> for two
634+
**********************************************************************************************
635+
**********************************************************************************************
636+
*/
627637
private abstract class DataLoaderSubscriberBase<T> implements Subscriber<T> {
628638

629639
final CompletableFuture<List<V>> valuesFuture;
@@ -721,6 +731,11 @@ private DataLoaderSubscriber(
721731
public synchronized void onNext(V value) {
722732
super.onNext(value);
723733

734+
if (idx >= keys.size()) {
735+
// hang on they have given us more values than we asked for in keys
736+
// we cant handle this
737+
return;
738+
}
724739
K key = keys.get(idx);
725740
Object callContext = callContexts.get(idx);
726741
CompletableFuture<V> future = queuedFutures.get(idx);
@@ -734,8 +749,16 @@ public synchronized void onNext(V value) {
734749
@Override
735750
public synchronized void onComplete() {
736751
super.onComplete();
737-
assertResultSize(keys, completedValues);
738-
752+
if (keys.size() != completedValues.size()) {
753+
// we have more or less values than promised
754+
// we will go through all the outstanding promises and mark those that
755+
// have not finished as failed
756+
for (CompletableFuture<V> queuedFuture : queuedFutures) {
757+
if (!queuedFuture.isDone()) {
758+
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
759+
}
760+
}
761+
}
739762
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
740763
valuesFuture.complete(completedValues);
741764
}
@@ -748,9 +771,11 @@ public synchronized void onError(Throwable ex) {
748771
for (int i = idx; i < queuedFutures.size(); i++) {
749772
K key = keys.get(i);
750773
CompletableFuture<V> future = queuedFutures.get(i);
751-
future.completeExceptionally(ex);
752-
// clear any cached view of this key because they all failed
753-
dataLoader.clear(key);
774+
if (! future.isDone()) {
775+
future.completeExceptionally(ex);
776+
// clear any cached view of this key because it failed
777+
dataLoader.clear(key);
778+
}
754779
}
755780
valuesFuture.completeExceptionally(ex);
756781
}
@@ -790,11 +815,14 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
790815
V value = entry.getValue();
791816

792817
Object callContext = callContextByKey.get(key);
793-
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
818+
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
794819

795820
onNextValue(key, value, callContext, futures);
796821

797-
completedValuesByKey.put(key, value);
822+
// did we have an actual key for this value - ignore it if they send us one outside the key set
823+
if (!futures.isEmpty()) {
824+
completedValuesByKey.put(key, value);
825+
}
798826
}
799827

800828
@Override
@@ -806,6 +834,16 @@ public synchronized void onComplete() {
806834
for (K key : keys) {
807835
V value = completedValuesByKey.get(key);
808836
values.add(value);
837+
838+
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
839+
for (CompletableFuture<V> future : futures) {
840+
if (! future.isDone()) {
841+
// we have a future that never came back for that key
842+
// but the publisher is done sending in data - it must be null
843+
// e.g. for key X when found no value
844+
future.complete(null);
845+
}
846+
}
809847
}
810848
valuesFuture.complete(values);
811849
}

0 commit comments

Comments
 (0)