Skip to content

ISSUE-104: remove unnecessary mapResult calls #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 3, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 36 additions & 54 deletions core/src/main/scala/scala/collection/parallel/ParIterableLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ self =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
tasksupport.executeAndWaitResult(new Reduce(op, splitter)).get
}

/** Optionally reduces the elements of this sequence using the specified associative binary operator.
Expand Down Expand Up @@ -464,11 +464,11 @@ self =>
}

def min[U >: T](implicit ord: Ordering[U]): T = {
tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
tasksupport.executeAndWaitResult(new Min(ord, splitter)).get.asInstanceOf[T]
}

def max[U >: T](implicit ord: Ordering[U]): T = {
tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
tasksupport.executeAndWaitResult(new Max(ord, splitter)).get.asInstanceOf[T]
}

def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = {
Expand All @@ -484,15 +484,15 @@ self =>
}

def map[S](f: T => S): CC[S] = {
tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new Map[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
}

def collect[S](pf: PartialFunction[T, S]): CC[S] = {
tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new Collect[S, CC[S]](pf, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
}

def flatMap[S](f: T => IterableOnce[S]): CC[S] = {
tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new FlatMap[S, CC[S]](f, combinerFactory(() => companion.newCombiner[S]), splitter)).resultWithTaskSupport
}

/** Tests whether a predicate holds for all elements of this $coll.
Expand Down Expand Up @@ -572,11 +572,11 @@ self =>
def withFilter(pred: T => Boolean): Repr = filter(pred)

def filter(pred: T => Boolean): Repr = {
tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter)).resultWithTaskSupport
}

def filterNot(pred: T => Boolean): Repr = {
tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter)).resultWithTaskSupport
}

def ++[U >: T](that: IterableOnce[U]): CC[U] = that match {
Expand All @@ -588,10 +588,8 @@ self =>
val othtask = new other.Copy(cfactory, other.splitter)
tasksupport.executeAndWaitResult(othtask)
}
val task = (copythis parallel copythat) { _ combine _ } mapResult {
_.resultWithTaskSupport
}
tasksupport.executeAndWaitResult(task)
val task = (copythis parallel copythat) { _ combine _ }
tasksupport.executeAndWaitResult(task).resultWithTaskSupport
case _ =>
// println("case parallel builder, `that` not parallel")
val copythis = new Copy(combinerFactory(() => companion.newCombiner[U]), splitter)
Expand All @@ -600,15 +598,13 @@ self =>
cb ++= that
cb
}
tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ }).resultWithTaskSupport
}

def partition(pred: T => Boolean): (Repr, Repr) = {
tasksupport.executeAndWaitResult(
new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult {
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
}
)
val result = tasksupport.executeAndWaitResult(
new Partition(pred, combinerFactory, combinerFactory, splitter))
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
}

def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
Expand All @@ -621,9 +617,7 @@ self =>
def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
_.resultWithTaskSupport
})
else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter)).resultWithTaskSupport
}

private def take_sequential(n: Int) = {
Expand All @@ -641,7 +635,7 @@ self =>
def drop(n: Int): Repr = {
val actualn = if (size > n) n else size
if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter)).resultWithTaskSupport
}

private def drop_sequential(n: Int) = {
Expand All @@ -656,7 +650,7 @@ self =>
val from = unc_from min size max 0
val until = unc_until min size max from
if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter)).resultWithTaskSupport
}

private def slice_sequential(from: Int, until: Int): Repr = {
Expand All @@ -671,11 +665,9 @@ self =>
}

def splitAt(n: Int): (Repr, Repr) = {
tasksupport.executeAndWaitResult(
new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult {
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
}
)
val result = tasksupport.executeAndWaitResult(
new SplitAt(n, combinerFactory, combinerFactory, splitter))
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
}

/** Computes a prefix scan of the elements of the collection.
Expand All @@ -689,9 +681,7 @@ self =>
*/
def scan[U >: T](z: U)(op: (U, U) => U): CC[U] = {
if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U])) mapResult {
cb => cb.resultWithTaskSupport
})
tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => companion.newCombiner[U]))).resultWithTaskSupport
}) else setTaskSupport((companion.newCombiner[U] += z).result(), tasksupport)
}

Expand All @@ -711,15 +701,12 @@ self =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val parseqspan = toSeq.takeWhile(pred)
tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult {
_.resultWithTaskSupport
})
tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter)).resultWithTaskSupport
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult {
_._1.resultWithTaskSupport
})
val result = tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx))
result._1.resultWithTaskSupport
}
}

Expand All @@ -736,18 +723,18 @@ self =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val (xs, ys) = toSeq.span(pred)
val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport }
val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport }
val copyxs = new Copy(combinerFactory, xs.splitter)
val copyys = new Copy(combinerFactory, ys.splitter)
val copyall = (copyxs parallel copyys) {
(xr, yr) => (xr, yr)
}
tasksupport.executeAndWaitResult(copyall)
val result = tasksupport.executeAndWaitResult(copyall)
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
})
val result = tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx))
(result._1.resultWithTaskSupport, result._2.resultWithTaskSupport)
}
}

Expand All @@ -765,10 +752,7 @@ self =>
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
tasksupport.executeAndWaitResult(
new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
_._2.resultWithTaskSupport
}
)
new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx))._2.resultWithTaskSupport
}

def copyToArray[U >: T](xs: Array[U]): Unit = copyToArray(xs, 0)
Expand All @@ -785,7 +769,7 @@ self =>
def zip[U >: T, S](that: ParIterable[S]): CC[(U, S)] = {
that match {
case thatseq: ParSeq[S] =>
tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter)).resultWithTaskSupport
case _ =>
(companion.newBuilder[(U, S)] ++= setTaskSupport(seq.zip(that.seq), tasksupport)).result()
}
Expand All @@ -806,18 +790,16 @@ self =>
def zipAll[S, U >: T](that: ParIterable[S], thisElem: U, thatElem: S): CC[(U, S)] = {
val thatseq = that.toSeq
tasksupport.executeAndWaitResult(
new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter) mapResult {
_.resultWithTaskSupport
}
)
new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => companion.newCombiner[(U, S)]), splitter, thatseq.splitter)
).resultWithTaskSupport
}

protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = {
tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter)).resultWithTaskSupport
}

protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = {
tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport })
tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev)).resultWithTaskSupport
}

def toArray[U >: T: ClassTag]: Array[U] = {
Expand Down