-
Notifications
You must be signed in to change notification settings - Fork 93
A PR for reactive streams support #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
**Note**: This commit, as-is, is not (yet) intended for merge. It is created to provide a proof-of-concept and gauge interest as polishing/testing this requires a non-trivial amount of effort. Motivation ========== The current DataLoader mechanism completes the corresponding `CompletableFuture` for a given key when the corresponding value is returned. However, DataLoader's `BatchLoader` assumes that the underlying batch function can only return all of its requested items at once (as an example, a SQL database query). However, the batch function may be a service that can return items progressively using a subscription-like architecture. Some examples include: - Project Reactor's [Subscriber](https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html). - gRPC's [StreamObserver](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html). - RX Java's [Flowable](https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html). Streaming results in this fashion offers several advantages: - Certain values may be returned earlier than others (for example, the batch function may have cached values it can return early). - Memory load is lessened on the batch function (which may be an external service), as it does not need to keep hold of the retrieved values before it can send them out at once. - We are able to save the need to stream individual error values by providing an `onError` function to terminate the stream early. Proposal ======== We provide two new `BatchLoader`s and support for them in `java-dataloader`: - `ObserverBatchLoader`, with a load function that accepts: - a list of keys. - a `BatchObserver` intended as a delegate for publisher-like structures found in Project Reactor and Rx Java. This obviates the need to depend on external libraries. - `MappedObserverBatchLoader`, similar to `ObserverBatchLoader` but with an `onNext` that accepts a key _and_ value (to allow for early termination of streams without needing to process `null`s). - `*WithContext` variants for the above. The key value-add is that the implementation of `BatchObserver` (provided to the load functions) will immediately complete the queued future for a given key when `onNext` is called with a value. This means that if we have a batch function that can deliver values progressively, we can continue evaluating the query as the values arrive. As an arbitrary example, let's have a batch function that serves both the reporter and project fields on a Jira issue: ```graphql query { issue { project { issueTypes { ... } } reporter { ... } } } ``` If the batch function can return a `project` immediately but is delayed in when it can `reporter`, then our batch loader can return `project` and start evaluating the `issueTypes` immediately while we load the `reporter` in parallel. This would provide a more performant query evaluation. As mentioned above, this is not in a state to be merged - this is intended to gauge whether this is something the maintainers would be interested in owning. Should this be the case, the author is willing to test/polish this pull request so that it may be merged.
…anch # Conflicts: # build.gradle
…er-proof-of-concept * origin/master: Bump to Java 11
`reactive-streams` has become the de-facto standard for reactive frameworks; we thus use this as a base to allow seamless interop (rather than prompt an extra adapter layer).
This gives us more workable exceptions.
Passing an exception into `onNext` is not typically done in reactive-land - we would instead call `onError(Throwable)`. We can thus avoid handling this case.
This is keeping in line with the other methods found in `DataLoaderFactory`.
Given the large number of existing tests, we copy across this existing set for our publisher tests. What this really indicates is that we should invest in parameterised testing, but this is a bit painful in JUnit 4 - so we'll bump to JUnit 5 independently and parameterise when we have this available. This is important because re-using the existing test suite reveals a failure that we'll need to address.
This keeps in line with the original suggestion (because yours truly couldn't read, apparently). We also purge any remaining mention of 'observer', which was the first swing at this code.
Multiple threads may call `onNext` - we thus (lazily) chuck a `synchronized` to ensure correctness at the cost of speed. In future, we should examine how we should manage this concurrency better.
…roof-of-concept Add a proof-of-concept for "Observer-like" batch loading
…anch # Conflicts: # build.gradle
We now have the same coverage but with less code. Note that: - this is currently failing on 'duplicate keys when caching disabled'. - we still need to add tests that only make sense for the Publisher variants (e.g. half-completed keys).
If we did not cache the futures, then the MappedBatchPublisher DataLoader would not work as we were only completing the last future for a given key.
Migrate publisher tests
…active-streams-common-publisher-impl # Conflicts: # src/main/java/org/dataloader/DataLoaderHelper.java # src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java # src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java
…lisher-impl Making the Subscribers use a common base class
…anch-extra-tests-for-reactive
…ra-tests-for-reactive More tests for Publishers on reactive branch
…aderHelper is way too big
…e-reactive-classes-out-of-dataloader-helper Reactive streams branch move reactive classes out of dataloader helper
This is more symmetric with `MappedbatchLoader` and preserves efficiency; we do not need to emit a `Map.Entry` for duplicate keys (given the strong intention that this will be used to create a `Map`).
…ublishers Have MappedBatchPublisher take in a Set<K> keys (and add README sections)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR is a long running branch with work to allow "reactive" publishers to complete works progressively as results arrive.
A normal BatchLoader gathers ALL the value futures (given a set of keys) and completes them in one go.
The use of reactive Publisher / Subscribers means that keys can complete progressively as each result arrives.
This may mean that processing will happen quicker depending on whether further sub processing occurs