Skip to content

Commit 651e561

Browse files
Fix MappedBatchPublisher loaders to work without cache
If we did not cache the futures, then the MappedBatchPublisher DataLoader would not work as we were only completing the last future for a given key.
1 parent eb2b40c commit 651e561

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ private class DataLoaderMapEntrySubscriber implements Subscriber<Map.Entry<K, V>
714714
private final List<Object> callContexts;
715715
private final List<CompletableFuture<V>> queuedFutures;
716716
private final Map<K, Object> callContextByKey;
717-
private final Map<K, CompletableFuture<V>> queuedFutureByKey;
717+
private final Map<K, List<CompletableFuture<V>>> queuedFuturesByKey;
718718

719719
private final List<K> clearCacheKeys = new ArrayList<>();
720720
private final Map<K, V> completedValuesByKey = new HashMap<>();
@@ -733,13 +733,13 @@ private DataLoaderMapEntrySubscriber(
733733
this.queuedFutures = queuedFutures;
734734

735735
this.callContextByKey = new HashMap<>();
736-
this.queuedFutureByKey = new HashMap<>();
736+
this.queuedFuturesByKey = new HashMap<>();
737737
for (int idx = 0; idx < queuedFutures.size(); idx++) {
738738
K key = keys.get(idx);
739739
Object callContext = callContexts.get(idx);
740740
CompletableFuture<V> queuedFuture = queuedFutures.get(idx);
741741
callContextByKey.put(key, callContext);
742-
queuedFutureByKey.put(key, queuedFuture);
742+
queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture);
743743
}
744744
}
745745

@@ -756,20 +756,20 @@ public void onNext(Map.Entry<K, V> entry) {
756756
V value = entry.getValue();
757757

758758
Object callContext = callContextByKey.get(key);
759-
CompletableFuture<V> future = queuedFutureByKey.get(key);
759+
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
760760
if (value instanceof Try) {
761761
// we allow the batch loader to return a Try so we can better represent a computation
762762
// that might have worked or not.
763763
Try<V> tryValue = (Try<V>) value;
764764
if (tryValue.isSuccess()) {
765-
future.complete(tryValue.get());
765+
futures.forEach(f -> f.complete(tryValue.get()));
766766
} else {
767767
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
768-
future.completeExceptionally(tryValue.getThrowable());
768+
futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable()));
769769
clearCacheKeys.add(key);
770770
}
771771
} else {
772-
future.complete(value);
772+
futures.forEach(f -> f.complete(value));
773773
}
774774

775775
completedValuesByKey.put(key, value);
@@ -801,9 +801,11 @@ public void onError(Throwable ex) {
801801
// Complete the futures for the remaining keys with the exception.
802802
for (int idx = 0; idx < queuedFutures.size(); idx++) {
803803
K key = keys.get(idx);
804-
CompletableFuture<V> future = queuedFutureByKey.get(key);
804+
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
805805
if (!completedValuesByKey.containsKey(key)) {
806-
future.completeExceptionally(ex);
806+
for (CompletableFuture<V> future : futures) {
807+
future.completeExceptionally(ex);
808+
}
807809
// clear any cached view of this key because they all failed
808810
dataLoader.clear(key);
809811
}

src/test/java/org/dataloader/DataLoaderTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -732,10 +732,10 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader
732732
assertThat(future1.get(), equalTo("A"));
733733
assertThat(future2.get(), equalTo("B"));
734734
assertThat(future3.get(), equalTo("A"));
735-
if (factory instanceof ListDataLoaderFactory || factory instanceof PublisherDataLoaderFactory) {
736-
assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A"))));
737-
} else {
735+
if (factory instanceof MappedDataLoaderFactory) {
738736
assertThat(loadCalls, equalTo(singletonList(asList("A", "B"))));
737+
} else {
738+
assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A"))));
739739
}
740740
}
741741

0 commit comments

Comments
 (0)