Skip to content

Allow ValueCache to work with Publisher DataLoader #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
missedKeyIndexes.add(i);
missedKeys.add(keys.get(i));
missedKeyContexts.add(keyContexts.get(i));
missedQueuedFutures.add(queuedFutures.get(i));
} else {
queuedFutures.get(i).complete(cacheGet.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the code queuedFutures.get(i).complete(cacheGet.get()); here???

Before it never used that parameter in this mutative way (eg completing a value if its not a cache get failure)

Is this the bug ??? Does it only happen on Publishers ???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant find the test code for this fix

Copy link
Collaborator Author

@AlexandreCarlton AlexandreCarlton Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the bug ??? Does it only happen on Publishers ???

Yes - well, one of them (and it does indeed affect only the publishers). This particular line of code is needed if we have a cache hit in our value cache and want to return that - otherwise, we never complete, and our load returns a CompletableFuture that never completes. The way to check this is to comment out this line and re-run in particular DataLoaderValueCacheTest#batch_caching_works_as_expected.

I am not sure that this is the best way to solve this, but it does make the tests go green and doesn't appear to make anything else red - if this will cause other problems then we should add tests for that too (which I'm happy to do).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason it did not work is here AFTER the invokeLoader method is called

    private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> callContexts, List<CompletableFuture<V>> queuedFutures) {
        stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts));
        CompletableFuture<List<V>> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled());
        return batchLoad
                .thenApply(values -> {
                    assertResultSize(keys, values);
                    if (isPublisher() || isMappedPublisher()) {
                        // We have already completed the queued futures by the time the overall batchLoad future has completed.
                        return values;
                    }

eg if its publisher its considered done while the others continue on to turn Trys into values. But I think where you have fixed it is approporiate

}
}
}
Expand Down
107 changes: 44 additions & 63 deletions src/test/java/org/dataloader/DataLoaderTest.java

Large diffs are not rendered by default.

105 changes: 59 additions & 46 deletions src/test/java/org/dataloader/DataLoaderValueCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import org.dataloader.fixtures.CaffeineValueCache;
import org.dataloader.fixtures.CustomValueCache;
import org.dataloader.fixtures.parameterized.TestDataLoaderFactory;
import org.dataloader.impl.DataLoaderAssertionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,7 +21,6 @@
import static java.util.Collections.singletonList;
import static org.awaitility.Awaitility.await;
import static org.dataloader.DataLoaderOptions.newOptions;
import static org.dataloader.fixtures.TestKit.idLoader;
import static org.dataloader.fixtures.TestKit.snooze;
import static org.dataloader.fixtures.TestKit.sort;
import static org.dataloader.impl.CompletableFutureKit.failedFuture;
Expand All @@ -30,11 +32,12 @@

public class DataLoaderValueCacheTest {

@Test
public void test_by_default_we_have_no_value_caching() {
List<List<String>> loadCalls = new ArrayList<>();
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we "add" the new tests - we are now testing this against PublisherDataLoaderFactory and MappedPublisherDataLoaderFactory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I get it now - we have tests for non publisher data loaders and publisher data loaders. So its working for the old and the new

public void test_by_default_we_have_no_value_caching(TestDataLoaderFactory factory) {
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions();
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand Down Expand Up @@ -64,12 +67,13 @@ public void test_by_default_we_have_no_value_caching() {
assertThat(loadCalls, equalTo(emptyList()));
}

@Test
public void should_accept_a_remote_value_store_for_caching() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void should_accept_a_remote_value_store_for_caching(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache();
List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

// Fetches as expected

Expand Down Expand Up @@ -108,8 +112,9 @@ public void should_accept_a_remote_value_store_for_caching() {
assertArrayEquals(customValueCache.store.keySet().toArray(), emptyList().toArray());
}

@Test
public void can_use_caffeine_for_caching() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void can_use_caffeine_for_caching(TestDataLoaderFactory factory) {
//
// Mostly to prove that some other CACHE library could be used
// as the backing value cache. Not really Caffeine specific.
Expand All @@ -121,9 +126,9 @@ public void can_use_caffeine_for_caching() {

ValueCache<String, Object> caffeineValueCache = new CaffeineValueCache(caffeineCache);

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(caffeineValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

// Fetches as expected

Expand All @@ -148,8 +153,9 @@ public void can_use_caffeine_for_caching() {
assertArrayEquals(caffeineCache.asMap().keySet().toArray(), asList("a", "b", "c").toArray());
}

@Test
public void will_invoke_loader_if_CACHE_GET_call_throws_exception() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void will_invoke_loader_if_CACHE_GET_call_throws_exception(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

@Override
Expand All @@ -163,9 +169,9 @@ public CompletableFuture<Object> get(String key) {
customValueCache.set("a", "Not From Cache");
customValueCache.set("b", "From Cache");

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -178,8 +184,9 @@ public CompletableFuture<Object> get(String key) {
assertThat(loadCalls, equalTo(singletonList(singletonList("a"))));
}

@Test
public void will_still_work_if_CACHE_SET_call_throws_exception() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void will_still_work_if_CACHE_SET_call_throws_exception(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {
@Override
public CompletableFuture<Object> set(String key, Object value) {
Expand All @@ -190,9 +197,9 @@ public CompletableFuture<Object> set(String key, Object value) {
}
};

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -206,8 +213,9 @@ public CompletableFuture<Object> set(String key, Object value) {
assertArrayEquals(customValueCache.store.keySet().toArray(), singletonList("b").toArray());
}

@Test
public void caching_can_take_some_time_complete() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void caching_can_take_some_time_complete(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

@Override
Expand All @@ -228,9 +236,9 @@ public CompletableFuture<Object> get(String key) {
};


List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -247,8 +255,9 @@ public CompletableFuture<Object> get(String key) {
assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD"))));
}

@Test
public void batch_caching_works_as_expected() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void batch_caching_works_as_expected(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

@Override
Expand All @@ -269,9 +278,9 @@ public CompletableFuture<List<Try<Object>>> getValues(List<String> keys) {
};


List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -293,8 +302,9 @@ public CompletableFuture<List<Try<Object>>> getValues(List<String> keys) {
assertThat(values, equalTo(asList("missC", "missD")));
}

@Test
public void assertions_will_be_thrown_if_the_cache_does_not_follow_contract() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void assertions_will_be_thrown_if_the_cache_does_not_follow_contract(TestDataLoaderFactory factory) {
CustomValueCache customValueCache = new CustomValueCache() {

@Override
Expand All @@ -312,9 +322,9 @@ public CompletableFuture<List<Try<Object>>> getValues(List<String> keys) {
}
};

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -335,8 +345,9 @@ private boolean isAssertionException(CompletableFuture<String> fA) {
}


@Test
public void if_caching_is_off_its_never_hit() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_caching_is_off_its_never_hit(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
CustomValueCache customValueCache = new CustomValueCache() {

Expand All @@ -347,9 +358,9 @@ public CompletableFuture<Object> get(String key) {
}
};

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(false);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -368,8 +379,9 @@ public CompletableFuture<Object> get(String key) {
assertTrue(customValueCache.asMap().isEmpty());
}

@Test
public void if_everything_is_cached_no_batching_happens() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_everything_is_cached_no_batching_happens(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
AtomicInteger setCalls = new AtomicInteger();
CustomValueCache customValueCache = new CustomValueCache() {
Expand All @@ -390,9 +402,9 @@ public CompletableFuture<List<Object>> setValues(List<String> keys, List<Object>
customValueCache.asMap().put("b", "cachedB");
customValueCache.asMap().put("c", "cachedC");

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Expand All @@ -410,8 +422,9 @@ public CompletableFuture<List<Object>> setValues(List<String> keys, List<Object>
}


@Test
public void if_batching_is_off_it_still_can_cache() {
@ParameterizedTest
@MethodSource("org.dataloader.fixtures.parameterized.TestDataLoaderFactories#get")
public void if_batching_is_off_it_still_can_cache(TestDataLoaderFactory factory) {
AtomicInteger getCalls = new AtomicInteger();
AtomicInteger setCalls = new AtomicInteger();
CustomValueCache customValueCache = new CustomValueCache() {
Expand All @@ -430,9 +443,9 @@ public CompletableFuture<List<Object>> setValues(List<String> keys, List<Object>
};
customValueCache.asMap().put("a", "cachedA");

List<List<String>> loadCalls = new ArrayList<>();
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true).setBatchingEnabled(false);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);
DataLoader<String, String> identityLoader = factory.idLoader(options, loadCalls);

CompletableFuture<String> fA = identityLoader.load("a");
CompletableFuture<String> fB = identityLoader.load("b");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the new test that shows the bug happening. I cant see it for all the other changes (and I have merged #171 )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad for not explaining this. There aren't any "new" tests, as such - but we are now testing the existing tests here against the *PublisherDataLoader factories in addition to the standard ones (see 9a20334#r1901980139).

Expand Down
30 changes: 0 additions & 30 deletions src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.dataloader.MappedBatchLoader;
import org.dataloader.MappedBatchLoaderWithContext;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -61,43 +60,14 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
};
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(Duration delay) {
return keysAsValuesAsync(new ArrayList<>(), delay);
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(List<List<K>> loadCalls, Duration delay) {
return keys -> CompletableFuture.supplyAsync(() -> {
snooze(delay.toMillis());
List<K> ks = new ArrayList<>(keys);
loadCalls.add(ks);
@SuppressWarnings("unchecked")
List<V> values = keys.stream()
.map(k -> (V) k)
.collect(toList());
return values;
});
}

public static <K, V> DataLoader<K, V> idLoader() {
return idLoader(null, new ArrayList<>());
}

public static <K, V> DataLoader<K, V> idLoader(List<List<K>> loadCalls) {
return idLoader(null, loadCalls);
}

public static <K, V> DataLoader<K, V> idLoader(DataLoaderOptions options, List<List<K>> loadCalls) {
return DataLoaderFactory.newDataLoader(keysAsValues(loadCalls), options);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(Duration delay) {
return idLoaderAsync(null, new ArrayList<>(), delay);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(DataLoaderOptions options, List<List<K>> loadCalls, Duration delay) {
return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options);
}

public static Collection<Integer> listFrom(int i, int max) {
List<Integer> ints = new ArrayList<>();
for (int j = i; j < max; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import org.dataloader.DataLoaderOptions;
import org.dataloader.fixtures.TestKit;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static java.util.concurrent.CompletableFuture.completedFuture;
Expand All @@ -21,6 +23,15 @@ public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<
}, options);
}

@Override
public <K> DataLoader<K, K> idLoaderDelayed(DataLoaderOptions options, List<Collection<K>> loadCalls, Duration delay) {
return newDataLoader(keys -> CompletableFuture.supplyAsync(() -> {
TestKit.snooze(delay.toMillis());
loadCalls.add(new ArrayList<>(keys));
return keys;
}));
}

@Override
public <K> DataLoader<K, K> idLoaderBlowsUps(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
Expand Down
Loading
Loading