Skip to content

inconsistent RejectedExecutionException behavior with Future.sequence #12902

Open
@achyan

Description

@achyan

Reproduction steps

Scala version: 2.13.12

import java.util.concurrent._
import scala.concurrent.{Await, ExecutionContext, Future}

val threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(1))
val threadPool:ExecutionContext =  ExecutionContext.fromExecutor(threadPoolExecutor,
    t => { throw(t) })

val future1 = Future({ Thread.sleep(9999999)})(threadPool)
val future2 = Future({ Thread.sleep(9999999)})(threadPool)
val future3 = Future({ Thread.sleep(9999999)})(threadPool) // Future(Failure) due to RejectedRequest
implicit val implicitThreadpool: ExecutionContext = threadPool

Future.sequence(Seq(future1)) // Future(<not completed>) -- fine
Future.sequence(Seq(future3)) // Future(Failure(java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0])) -- fine

Future.sequence(Seq(future3, future1)) // val res2: scala.concurrent.Future[Seq[Unit]] = Future(Failure(java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0])) -- fine

Future.sequence(Seq(future1, future3)) 
/*
java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21)
  at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
  at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
  at scala.concurrent.impl.Promise$DefaultPromise.dispatchOrAddCallbacks(Promise.scala:312)
  at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:216)
  at scala.concurrent.impl.Promise$DefaultPromise.zipWith(Promise.scala:164)
  at scala.concurrent.Future$.$anonfun$sequence$1(Future.scala:720)
  at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:676)
  at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:670)
  at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1300)
  at scala.concurrent.Future$.sequence(Future.scala:720)
  */

Problem

I would've expected Future.sequence(Seq(future3, future1)) to match Future.sequence(Seq(future1, future3)) in either both returning an exception or both returning a failed future.

Also, with Scala 2.12, we would get the RejectedExecutionException when we submitted the 3rd future that exceeded the thread pool's queue limit. With Scala 2.13, we now get back a Future(Failure(ex)) instead of the exception. If that's the case, I would've expected the Future.sequence to also not throw an exception.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions