Skip to content

Predicates in DataLoaders #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ Object getCacheKeyWithContext(K key, Object context) {

DispatchResult<V> dispatch() {
boolean batchingEnabled = loaderOptions.batchingEnabled();
if (!loaderOptions.getDispatchPredicate().test(null, dataLoader)) {
return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), loaderQueue.size(), false);
}
//
// we copy the pre-loaded set of futures ready for dispatch
final List<K> keys = new ArrayList<>();
Expand All @@ -177,7 +180,7 @@ DispatchResult<V> dispatch() {
lastDispatchTime.set(now());
}
if (!batchingEnabled || keys.isEmpty()) {
return new DispatchResult<>(completedFuture(emptyList()), 0);
return new DispatchResult<>(completedFuture(emptyList()), 0, false);
}
final int totalEntriesHandled = keys.size();
//
Expand All @@ -198,7 +201,7 @@ DispatchResult<V> dispatch() {
} else {
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
}
return new DispatchResult<>(futureList, totalEntriesHandled);
return new DispatchResult<>(futureList, totalEntriesHandled, true);
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.dataloader.annotations.PublicApi;
import org.dataloader.impl.Assertions;
import org.dataloader.registries.DispatchPredicate;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.NoOpStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;
Expand All @@ -43,6 +44,7 @@ public class DataLoaderOptions {
private CacheKey<?> cacheKeyFunction;
private CacheMap<?, ?> cacheMap;
private ValueCache<?, ?> valueCache;
private DispatchPredicate dispatchPredicate;
private int maxBatchSize;
private Supplier<StatisticsCollector> statisticsCollector;
private BatchLoaderContextProvider environmentProvider;
Expand All @@ -60,6 +62,7 @@ public DataLoaderOptions() {
statisticsCollector = NoOpStatisticsCollector::new;
environmentProvider = NULL_PROVIDER;
valueCacheOptions = ValueCacheOptions.newOptions();
dispatchPredicate = DispatchPredicate.dispatchAlways();
batchLoaderScheduler = null;
}

Expand Down Expand Up @@ -290,6 +293,25 @@ public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
return this;
}

/**
* @return the dispatch predicate of these options
*/
public DispatchPredicate getDispatchPredicate() {
return dispatchPredicate;
}

/**
* Sets the {@link DispatchPredicate} to use.
*
* @param dispatchPredicate a non-null DispatchPredicate to use
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = nonNull(dispatchPredicate);
return this;
}

/**
* @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*/
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/dataloader/DispatchResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
public class DispatchResult<T> {
private final CompletableFuture<List<T>> futureList;
private final int keysCount;
private final boolean wasDispatched;

public DispatchResult(CompletableFuture<List<T>> futureList, int keysCount) {
this(futureList, keysCount, true);
}

public DispatchResult(CompletableFuture<List<T>> futureList, int keysCount, boolean wasDispatched) {
this.futureList = futureList;
this.keysCount = keysCount;
this.wasDispatched = wasDispatched;
}

public CompletableFuture<List<T>> getPromisedResults() {
Expand All @@ -28,4 +34,16 @@ public CompletableFuture<List<T>> getPromisedResults() {
public int getKeysCount() {
return keysCount;
}

/**
* If the {@link org.dataloader.registries.DispatchPredicate} associated with the dataloader
* returns false, then the call to dispatch was not performed and this will return false.
* <p>
* Similarly, if the set the loaded keys was empty or the batching is not enabled them this will return false
*
* @return true of the dispatch call was actually made or false if it was not
*/
public boolean wasDispatched() {
return wasDispatched;
}
}
32 changes: 26 additions & 6 deletions src/main/java/org/dataloader/registries/DispatchPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import java.util.Objects;

/**
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not
* A predicate class used by {@link ScheduledDataLoaderRegistry}s as well as by individual
* {@link DataLoader}s to decide whether to dispatch or not.
*/
@FunctionalInterface
public interface DispatchPredicate {
/**
* This predicate tests whether the data loader should be dispatched or not.
* This predicate tests whether the data loader should be dispatched or not. If the predicate is associated direct to a {@link DataLoader}
* then the dataLoaderKey parameter will be null.
*
* @param dataLoaderKey the key of the data loader when registered
* @param dataLoaderKey the key of the data loader when registered or null if this is a predicate associated direct with a {@link DataLoader}
* @param dataLoader the dataloader to dispatch
*
* @return true if the data loader SHOULD be dispatched
Expand Down Expand Up @@ -68,7 +70,7 @@ default DispatchPredicate or(DispatchPredicate other) {
*
* @param duration the length of time to check
*
* @return true if the data loader has not been dispatched in duration time
* @return a predicate that returns true if the data loader has not been dispatched in duration time
*/
static DispatchPredicate dispatchIfLongerThan(Duration duration) {
return (dataLoaderKey, dataLoader) -> {
Expand All @@ -79,14 +81,32 @@ static DispatchPredicate dispatchIfLongerThan(Duration duration) {

/**
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
*
* <p>
* This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true.
*
* @param depth the value to be greater than
*
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
* @return a predicate that returns true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
*/
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) {
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth;
}

/**
* This predicate will return true always
*
* @return a predicate that returns true always
*/
static DispatchPredicate dispatchAlways() {
return (dataLoaderKey, dataLoader) -> true;
}

/**
* This predicate will never return true
*
* @return a predicate that never returns true
*/
static DispatchPredicate dispatchNever() {
return (dataLoaderKey, dataLoader) -> false;
}
}
119 changes: 119 additions & 0 deletions src/test/java/org/dataloader/DataLoaderPredicateTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.dataloader;

import org.dataloader.registries.DispatchPredicate;
import org.dataloader.stats.SimpleStatisticsCollector;
import org.dataloader.stats.Statistics;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;

import static java.util.Arrays.asList;
import static org.dataloader.DataLoaderFactory.newDataLoader;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;

/**
* Tests related to dispatching predicates.
*/
public class DataLoaderPredicateTest {

@Test
public void the_predicate_will_prevent_loading() {
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
DataLoader<String, String> loader = newDataLoader(batchLoader,
DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new)
.dispatchPredicate(DispatchPredicate.dispatchNever())
);

loader.load("A");
loader.load("B");
loader.loadMany(asList("C", "D"));

Statistics stats = loader.getStatistics();
assertThat(stats.getLoadCount(), equalTo(4L));
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
assertThat(stats.getBatchLoadCount(), equalTo(0L));
assertThat(stats.getCacheHitCount(), equalTo(0L));

DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
assertThat(dispatchResult.wasDispatched(), equalTo(false));
assertThat(dispatchResult.getKeysCount(), equalTo(4));

stats = loader.getStatistics();
assertThat(stats.getLoadCount(), equalTo(4L));
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
assertThat(stats.getBatchLoadCount(), equalTo(0L));


loader.load("A");
loader.load("B");

dispatchResult = loader.dispatchWithCounts();
assertThat(dispatchResult.wasDispatched(), equalTo(false));
assertThat(dispatchResult.getKeysCount(), equalTo(4));

stats = loader.getStatistics();
assertThat(stats.getLoadCount(), equalTo(6L));
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
assertThat(stats.getBatchLoadCount(), equalTo(0L));
}

@Test
public void the_predicate_will_allow_loading_by_default() {
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
DataLoader<String, String> loader = newDataLoader(batchLoader,
DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new)
.dispatchPredicate(DispatchPredicate.dispatchAlways())
);

loader.load("A");
loader.load("B");
loader.loadMany(asList("C", "D"));


DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
assertThat(dispatchResult.wasDispatched(), equalTo(true));
assertThat(dispatchResult.getKeysCount(), equalTo(4));

Statistics stats = loader.getStatistics();
assertThat(stats.getLoadCount(), equalTo(4L));
assertThat(stats.getBatchInvokeCount(), equalTo(1L));
assertThat(stats.getBatchLoadCount(), equalTo(4L));


loader.load("E");
loader.load("F");

dispatchResult = loader.dispatchWithCounts();
assertThat(dispatchResult.wasDispatched(), equalTo(true));
assertThat(dispatchResult.getKeysCount(), equalTo(2));

stats = loader.getStatistics();
assertThat(stats.getLoadCount(), equalTo(6L));
assertThat(stats.getBatchInvokeCount(), equalTo(2L));
assertThat(stats.getBatchLoadCount(), equalTo(6L));
}

@Test
public void dataloader_options_have_a_default_which_is_always_on() {
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
DataLoaderOptions dataLoaderOptions = DataLoaderOptions.newOptions();

DispatchPredicate defaultPredicate = dataLoaderOptions.getDispatchPredicate();
assertThat(defaultPredicate, notNullValue());
assertThat(defaultPredicate.test(null, null), equalTo(true));


DataLoader<String, String> loader = newDataLoader(batchLoader, dataLoaderOptions);

loader.load("A");
loader.load("B");
loader.loadMany(asList("C", "D"));

DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
assertThat(dispatchResult.wasDispatched(), equalTo(true));
assertThat(dispatchResult.getKeysCount(), equalTo(4));

}
}