Closed
Description
Hey I found an issue with this.
The FutureConvertersImpl throws an exception when toCompletableFuture is called:
override def toCompletableFuture(): CompletableFuture[T] ={
throw new UnsupportedOperationException("this CompletionStage represents a read-only Scala Future")
}
Which is fine. However, if you do something like this:
@Test
public void testToJavaThenComposeWithToJavaThenAccept() throws InterruptedException,
ExecutionException, TimeoutException {
final Promise<String> p1 = promise();
final CompletableFuture future = new CompletableFuture();
final CompletionStage<String> second =
CompletableFuture.supplyAsync(() -> "Hello").
thenCompose(x -> toJava(p1.future()));
second.handle((x, t) -> {
if(t!=null){
t.printStackTrace();
future.completeExceptionally(t);
}else{
future.complete(x);
}
return null;
});
p1.success("Hello");
assertEquals("Hello", future.get(1000, MILLISECONDS));
}
thenCompose calls doThenCompose:
public <U> CompletableFuture<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn) {
return doThenCompose(fn, null);
}
And if there aren't any exceptions, then it calls toCompletableFuture which causes failure.
Removing the exception causes everything to complete just fine.
Is there another way this can be handled other than throwing an exception?
if (ex == null) {
if (e != null) {
if (dst == null)
dst = new CompletableFuture<U>();
execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
}
else {
try {
CompletionStage<U> cs = fn.apply(t);
if (cs == null ||
(dst = cs.toCompletableFuture()) == null)
ex = new NullPointerException();
} catch (Throwable rex) {
ex = rex;
}
}
}
private <U> CompletableFuture<U> doThenCompose
(Function<? super T, ? extends CompletionStage<U>> fn,
Executor e) {
if (fn == null) throw new NullPointerException();
CompletableFuture<U> dst = null;
ThenCompose<T,U> d = null;
Object r;
if ((r = result) == null) {
dst = new CompletableFuture<U>();
CompletionNode p = new CompletionNode
(d = new ThenCompose<T,U>(this, fn, dst, e));
while ((r = result) == null) {
if (UNSAFE.compareAndSwapObject
(this, COMPLETIONS, p.next = completions, p))
break;
}
}
if (r != null && (d == null || d.compareAndSet(0, 1))) {
T t; Throwable ex;
if (r instanceof AltResult) {
ex = ((AltResult)r).ex;
t = null;
}
else {
ex = null;
@SuppressWarnings("unchecked") T tr = (T) r;
t = tr;
}
if (ex == null) {
if (e != null) {
if (dst == null)
dst = new CompletableFuture<U>();
execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
}
else {
try {
CompletionStage<U> cs = fn.apply(t);
if (cs == null ||
(dst = cs.toCompletableFuture()) == null)
ex = new NullPointerException();
} catch (Throwable rex) {
ex = rex;
}
}
}
if (dst == null)
dst = new CompletableFuture<U>();
if (ex != null)
dst.internalComplete(null, ex);
}
helpPostComplete();
dst.helpPostComplete();
return dst;
}