Skip to content

Ticker mode on ScheduledDataLoaderRegistry #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 17, 2023
89 changes: 89 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,95 @@ since it was last dispatched".
The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less
than or equal to 10 but if 200ms pass it will dispatch.

## Chaining DataLoader calls

It's natural to want to have chained `DataLoader` calls.

```java
CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));
```

However, the challenge here is how to be efficient in batching terms.

This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue.

Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call
`dispatch` again when a load call has completed to maximize batching?

The most naive approach is to immediately dispatch the second chained call as follows :

```java
CompletableFuture<Object> chainedWithImmediateDispatch = dataLoaderA.load("user1")
.thenCompose(userAsKey -> {
CompletableFuture<Object> loadB = dataLoaderB.load(userAsKey);
dataLoaderB.dispatch();
return loadB;
});
```

The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since
it will likely result in batch sizes of 1.

This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the
batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary.

* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase.

* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`.


### ScheduledDataLoaderRegistry ticker mode

The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary
`DataLoader` calls after the initial `dispatch()` call is made.

The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created.

```java
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
.register("a", dataLoaderA)
.register("b", dataLoaderB)
.scheduledExecutorService(executorService)
.schedule(Duration.ofMillis(10))
.tickerMode(true) // ticker mode is on
.build();

CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));

```
When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly
the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`.

If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise
it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry.

You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry`
otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests
you may create too many threads.

### ScheduledDataLoaderRegistry dispatching algorithm

When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows :

* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time
* Then for every `DataLoader` in the registry
* The `DispatchPredicate` is called to test if the data loader should be dispatched
* if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future
* If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows:

* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time
* Then for every `DataLoader` in the registry
* The `DispatchPredicate` is called to test if the data loader should be dispatched
* if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future
* If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future
* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()`

## Other information sources

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,31 @@
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}.
* <p>
* This will allow you to chain together {@link DataLoader} load calls like this :
* <pre>{@code
* CompletableFuture<String> future = dataLoaderA.load("A")
* .thenCompose(value -> dataLoaderB.load(value));
* }</pre>
* <p>
* However, it may mean your batching will not be as efficient as it might be. In environments
* like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if
* dispatch should happen however in ticker mode it will be continuously rescheduled.
* <p>
* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job
* on the {@link ScheduledExecutorService} that is continuously dispatching.
* <p>
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
* <p>
* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you
* are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService}
* to avoid creating a new thread per registry created.
* <p>
* This code is currently marked as {@link ExperimentalApi}
*/
@ExperimentalApi
Expand All @@ -37,13 +59,15 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A
private final DispatchPredicate dispatchPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private final boolean tickerMode;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
super();
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.schedule = builder.schedule;
this.tickerMode = builder.tickerMode;
this.closed = false;
this.dispatchPredicate = builder.dispatchPredicate;
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
Expand All @@ -64,6 +88,13 @@ public Duration getScheduleDuration() {
return schedule;
}

/**
* @return true of the registry is in ticker mode or false otherwise
*/
public boolean isTickerMode() {
return tickerMode;
}

/**
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
* and return a new combined registry
Expand Down Expand Up @@ -127,25 +158,6 @@ public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoa
return this;
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
Expand All @@ -157,11 +169,7 @@ public int dispatchAllWithCount() {
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
}
sum += dispatchOrReschedule(key, dataLoader);
}
return sum;
}
Expand Down Expand Up @@ -196,19 +204,42 @@ public void rescheduleNow() {
dataLoaders.forEach(this::reschedule);
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

private void reschedule(String key, DataLoader<?, ?> dataLoader) {
if (!closed) {
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
}
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
private int dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
int sum = 0;
boolean shouldDispatch = shouldDispatch(key, dataLoader);
if (shouldDispatch) {
sum = dataLoader.dispatchWithCounts().getKeysCount();
}
if (tickerMode || !shouldDispatch) {
reschedule(key, dataLoader);
}
return sum;
}

/**
Expand All @@ -228,6 +259,7 @@ public static class Builder {
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private Duration schedule = Duration.ofMillis(10);
private boolean tickerMode = false;

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
Expand Down Expand Up @@ -298,6 +330,20 @@ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
return this;
}

/**
* This sets ticker mode on the registry. When ticker mode is true the registry will
* continuously reschedule the data loaders for possible dispatching after the first call
* to dispatchAll.
*
* @param tickerMode true or false
*
* @return this builder for a fluent pattern
*/
public Builder tickerMode(boolean tickerMode) {
this.tickerMode = tickerMode;
return this;
}

/**
* @return the newly built {@link ScheduledDataLoaderRegistry}
*/
Expand Down
36 changes: 35 additions & 1 deletion src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -304,7 +307,7 @@ public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMapp
};
}

private void ScheduledDispatche() {
private void ScheduledDispatcher() {
DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10)
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));

Expand All @@ -314,4 +317,35 @@ private void ScheduledDispatche() {
.register("users", userDataLoader)
.build();
}


DataLoader<String, User> dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader);
DataLoader<User, Object> dataLoaderB = DataLoaderFactory.newDataLoader(keys -> {
return CompletableFuture.completedFuture(Collections.singletonList(1L));
});

private void ScheduledDispatcherChained() {
CompletableFuture<Object> chainedCalls = dataLoaderA.load("user1")
.thenCompose(userAsKey -> dataLoaderB.load(userAsKey));


CompletableFuture<Object> chainedWithImmediateDispatch = dataLoaderA.load("user1")
.thenCompose(userAsKey -> {
CompletableFuture<Object> loadB = dataLoaderB.load(userAsKey);
dataLoaderB.dispatch();
return loadB;
});


ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
.register("a", dataLoaderA)
.register("b", dataLoaderB)
.scheduledExecutorService(executorService)
.schedule(Duration.ofMillis(10))
.tickerMode(true) // ticker mode is on
.build();

}
}
28 changes: 27 additions & 1 deletion src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.dataloader.MappedBatchLoader;
import org.dataloader.MappedBatchLoaderWithContext;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -61,6 +62,23 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
};
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(Duration delay) {
return keysAsValuesAsync(new ArrayList<>(), delay);
}

public static <K, V> BatchLoader<K, V> keysAsValuesAsync(List<List<K>> loadCalls, Duration delay) {
return keys -> CompletableFuture.supplyAsync(() -> {
snooze(delay.toMillis());
List<K> ks = new ArrayList<>(keys);
loadCalls.add(ks);
@SuppressWarnings("unchecked")
List<V> values = keys.stream()
.map(k -> (V) k)
.collect(toList());
return values;
});
}

public static <K, V> DataLoader<K, V> idLoader() {
return idLoader(null, new ArrayList<>());
}
Expand All @@ -73,6 +91,14 @@ public static <K, V> DataLoader<K, V> idLoader(DataLoaderOptions options, List<L
return DataLoaderFactory.newDataLoader(keysAsValues(loadCalls), options);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(Duration delay) {
return idLoaderAsync(null, new ArrayList<>(), delay);
}

public static <K, V> DataLoader<K, V> idLoaderAsync(DataLoaderOptions options, List<List<K>> loadCalls, Duration delay) {
return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options);
}

public static Collection<Integer> listFrom(int i, int max) {
List<Integer> ints = new ArrayList<>();
for (int j = i; j < max; j++) {
Expand All @@ -85,7 +111,7 @@ public static <V> CompletableFuture<V> futureError() {
return failedFuture(new IllegalStateException("Error"));
}

public static void snooze(int millis) {
public static void snooze(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Expand Down
Loading