Skip to content

Automatic widening of numeric types for primitive Streams #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,27 @@ class Test {

Scala collections gain `seqStream` and `parStream` as extension methods that produce a Java 8 Stream
running sequentially or in parallel, respectively. These are automatically specialized to a primitive
type if possible. For instance, `List(1,2).seqStream` produces an `IntStream`. Maps additionally have
type if possible, including automatically applied widening conversions. For instance, `List(1,2).seqStream`
produces an `IntStream`, and so does `List(1.toShort, 2.toShort).parStream`. Maps additionally have
`seqKeyStream`, `seqValueStream`, `parKeyStream`, and `parValueStream` methods.

Scala collections also gain `accumulate` and `stepper` methods that produce utility collections that
can be useful when working with Java 8 Streams. `accumulate` produces an `Accumulator` or its primitive
counterpart (`DoubleAccumulator`, etc.), which is a low-level collection designed for efficient collection
and dispatching of results to and from Streams. Unlike most collections, it can contain more than
`Int.MaxValue` elements. `stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`.
`Stepper`s underlie the Scala collections' instances of Java 8 Streams.
`Int.MaxValue` elements.

`stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`. `Stepper`s underlie the Scala
collections' instances of Java 8 Streams. Steppers are intended as low-level building blocks for streams.
Usually you would not create them directly or call their methods but you can implement them alongside custom
collections to get better performance when streaming from these collections.

Java 8 Streams gain `toScala[Coll]` and `accumulate` methods, to make it easy to produce Scala collections
or Accumulators, respectively, from Java 8 Streams. For instance, `myStream.to[Vector]` will collect the
contents of a Stream into a `scala.collection.immutable.Vector`. Note that standard sequential builders
are used for collections, so this is best done to gather the results of an expensive computation.

Finally, there is a Java class, `ScalaStreamer`, that has a series of `from` methods that can be used to
Finally, there is a Java class, `ScalaStreamSupport`, that has a series of `stream` methods that can be used to
obtain Java 8 Streams from Scala collections from within Java.

#### Performance Considerations
Expand Down Expand Up @@ -218,7 +223,7 @@ def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) =
#### Java Usage Example

To convert a Scala collection to a Java 8 Stream from within Java, it usually
suffices to call `ScalaStreaming.from(xs)` on your collection `xs`. If `xs` is
suffices to call `ScalaStreamSupport.stream(xs)` on your collection `xs`. If `xs` is
a map, you may wish to get the keys or values alone by using `fromKeys` or
`fromValues`. If the collection has an underlying representation that is not
efficiently parallelized (e.g. `scala.collection.immutable.List`), then
Expand All @@ -237,14 +242,14 @@ Here is an example of conversion of a Scala collection within Java 8:

```java
import scala.collection.mutable.ArrayBuffer;
import scala.compat.java8.ScalaStreaming;
import scala.compat.java8.ScalaStreamSupport;

public class StreamConvertersExample {
public int MakeAndUseArrayBuffer() {
ArrayBuffer<String> ab = new ArrayBuffer<String>();
ab.$plus$eq("salmon");
ab.$plus$eq("herring");
return ScalaStreaming.from(ab).mapToInt(x -> x.length()).sum(); // 6+7 = 13
return ScalaStreamSupport.stream(ab).mapToInt(x -> x.length()).sum(); // 6+7 = 13
}
}
```
88 changes: 86 additions & 2 deletions src/main/scala/scala/compat/java8/StreamConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ trait PrimitiveStreamUnboxer[A, S] {
def apply(boxed: Stream[A]): S
}

trait Priority3StreamConverters {
trait Priority4StreamConverters {
// Fallback converters for AnySteppers that cannot be unboxed and widened to primitive streams
implicit class EnrichAnySteppableWithParStream[A, CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[A] with EfficientSubstep])
extends MakesParallelStream[A, Stream[A]] {
def parStream: Stream[A] = StreamSupport.stream(steppize(cc).stepper.anticipateParallelism, true)
Expand All @@ -25,7 +26,6 @@ trait Priority3StreamConverters {
implicit class EnrichAnyValueSteppableWithParValueStream[V, CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[V] with EfficientSubstep]) {
def parValueStream: Stream[V] = StreamSupport.stream(steppize(cc).valueStepper.anticipateParallelism, true)
}
// Note--conversion is only to make sure implicit conversion priority is lower than alternatives.
implicit class EnrichScalaCollectionWithSeqStream[A, CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[A]])
extends MakesSequentialStream[A, Stream[A]] {
def seqStream: Stream[A] = StreamSupport.stream(steppize(cc).stepper, false)
Expand All @@ -38,6 +38,90 @@ trait Priority3StreamConverters {
}
}

trait Priority3StreamConverters extends Priority4StreamConverters {
// Prefer to unbox and widen small primitive types over keeping them boxed
implicit class EnrichBoxedFloatSteppableWithParStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Float] with EfficientSubstep])
extends MakesParallelStream[java.lang.Double, DoubleStream] {
def parStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).stepper.anticipateParallelism), true)
}
implicit class EnrichBoxedFloatKeySteppableWithParKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Float] with EfficientSubstep]) {
def parKeyStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).keyStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedFloatValueSteppableWithParValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Float] with EfficientSubstep]) {
def parValueStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).valueStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedByteSteppableWithParStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Byte] with EfficientSubstep])
extends MakesParallelStream[java.lang.Integer, IntStream] {
def parStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).stepper.anticipateParallelism), true)
}
implicit class EnrichBoxedByteKeySteppableWithParKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Byte] with EfficientSubstep]) {
def parKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).keyStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedByteValueSteppableWithParValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Byte] with EfficientSubstep]) {
def parValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).valueStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedShortSteppableWithParStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Short] with EfficientSubstep])
extends MakesParallelStream[java.lang.Integer, IntStream] {
def parStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).stepper.anticipateParallelism), true)
}
implicit class EnrichBoxedShortKeySteppableWithParKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Short] with EfficientSubstep]) {
def parKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).keyStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedShortValueSteppableWithParValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Short] with EfficientSubstep]) {
def parValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).valueStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedCharSteppableWithParStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Char] with EfficientSubstep])
extends MakesParallelStream[java.lang.Integer, IntStream] {
def parStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).stepper.anticipateParallelism), true)
}
implicit class EnrichBoxedCharKeySteppableWithParKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Char] with EfficientSubstep]) {
def parKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).keyStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedCharValueSteppableWithParValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Char] with EfficientSubstep]) {
def parValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).valueStepper.anticipateParallelism), true)
}
implicit class EnrichBoxedFloatSteppableWithSeqStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Float]])
extends MakesSequentialStream[java.lang.Double, DoubleStream] {
def seqStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).stepper), false)
}
implicit class EnrichBoxedFloatKeySteppableWithSeqKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Float]]) {
def seqKeyStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).keyStepper), false)
}
implicit class EnrichBoxedFloatValueSteppableWithSeqValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Float]]) {
def seqValueStream: DoubleStream = StreamSupport.doubleStream(new Stepper.WideningFloatStepper(steppize(cc).valueStepper), false)
}
implicit class EnrichBoxedByteSteppableWithSeqStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Byte]])
extends MakesSequentialStream[java.lang.Integer, IntStream] {
def seqStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).stepper), false)
}
implicit class EnrichBoxedByteKeySteppableWithSeqKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Byte]]) {
def seqKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).keyStepper), false)
}
implicit class EnrichBoxedByteValueSteppableWithSeqValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Byte]]) {
def seqValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningByteStepper(steppize(cc).valueStepper), false)
}
implicit class EnrichBoxedShortSteppableWithSeqStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Short]])
extends MakesSequentialStream[java.lang.Integer, IntStream] {
def seqStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).stepper), false)
}
implicit class EnrichBoxedShortKeySteppableWithSeqKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Short]]) {
def seqKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).keyStepper), false)
}
implicit class EnrichBoxedShortValueSteppableWithSeqValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Short]]) {
def seqValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningShortStepper(steppize(cc).valueStepper), false)
}
implicit class EnrichBoxedCharSteppableWithSeqStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[AnyStepper[Char]])
extends MakesSequentialStream[java.lang.Integer, IntStream] {
def seqStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).stepper), false)
}
implicit class EnrichBoxedCharKeySteppableWithSeqKeyStream[CC](cc: CC)(implicit steppize: CC => MakesKeyStepper[AnyStepper[Char]]) {
def seqKeyStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).keyStepper), false)
}
implicit class EnrichBoxedCharValueSteppableWithSeqValueStream[CC](cc: CC)(implicit steppize: CC => MakesValueStepper[AnyStepper[Char]]) {
def seqValueStream: IntStream = StreamSupport.intStream(new Stepper.WideningCharStepper(steppize(cc).valueStepper), false)
}
}

trait Priority2StreamConverters extends Priority3StreamConverters {
implicit class EnrichDoubleSteppableWithParStream[CC](cc: CC)(implicit steppize: CC => MakesStepper[DoubleStepper with EfficientSubstep])
extends MakesParallelStream[java.lang.Double, DoubleStream] {
Expand Down
45 changes: 41 additions & 4 deletions src/main/scala/scala/compat/java8/collectionImpl/Stepper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,24 @@ trait AnyStepper[A] extends Stepper[A] with java.util.Iterator[A] with Spliterat
def parStream: java.util.stream.Stream[A] = java.util.stream.StreamSupport.stream(this, true)
}

object AnyStepper {
private[collectionImpl] class BoxedDoubleStepper(st: DoubleStepper) extends AnyStepper[Double] {
private[collectionImpl] object AnyStepper {
final class BoxedDoubleStepper(st: DoubleStepper) extends AnyStepper[Double] {
def hasNext(): Boolean = st.hasNext()
def next(): Double = st.next()
def characteristics(): Int = st.characteristics()
def estimateSize(): Long = st.estimateSize()
def substep(): AnyStepper[Double] = new BoxedDoubleStepper(st.substep())
}

private[collectionImpl] class BoxedIntStepper(st: IntStepper) extends AnyStepper[Int] {
final class BoxedIntStepper(st: IntStepper) extends AnyStepper[Int] {
def hasNext(): Boolean = st.hasNext()
def next(): Int = st.next()
def characteristics(): Int = st.characteristics()
def estimateSize(): Long = st.estimateSize()
def substep(): AnyStepper[Int] = new BoxedIntStepper(st.substep())
}

private[collectionImpl] class BoxedLongStepper(st: LongStepper) extends AnyStepper[Long] {
final class BoxedLongStepper(st: LongStepper) extends AnyStepper[Long] {
def hasNext(): Boolean = st.hasNext()
def next(): Long = st.next()
def characteristics(): Int = st.characteristics()
Expand Down Expand Up @@ -564,4 +564,41 @@ object Stepper {
case _ => new OfLongSpliterator(sp)
}

/* These adapter classes can wrap an AnyStepper of a small numeric type into the appropriately widened
* primitive Stepper type. This provides a basis for more efficient stream processing on unboxed values
* provided that the original source of the data is already boxed. In other cases the widening conversion
* should always be performed directly on the original unboxed values in a custom Stepper implementation
* (see for example StepsWidenedByteArray). */

private[java8] class WideningByteStepper(st: AnyStepper[Byte]) extends IntStepper {
def hasNext(): Boolean = st.hasNext()
def nextInt(): Int = st.next()
def characteristics(): Int = st.characteristics() | NonNull
def estimateSize(): Long = st.estimateSize()
def substep(): IntStepper = new WideningByteStepper(st.substep())
}

private[java8] class WideningCharStepper(st: AnyStepper[Char]) extends IntStepper {
def hasNext(): Boolean = st.hasNext()
def nextInt(): Int = st.next()
def characteristics(): Int = st.characteristics() | NonNull
def estimateSize(): Long = st.estimateSize()
def substep(): IntStepper = new WideningCharStepper(st.substep())
}

private[java8] class WideningShortStepper(st: AnyStepper[Short]) extends IntStepper {
def hasNext(): Boolean = st.hasNext()
def nextInt(): Int = st.next()
def characteristics(): Int = st.characteristics() | NonNull
def estimateSize(): Long = st.estimateSize()
def substep(): IntStepper = new WideningShortStepper(st.substep())
}

private[java8] class WideningFloatStepper(st: AnyStepper[Float]) extends DoubleStepper {
def hasNext(): Boolean = st.hasNext()
def nextDouble(): Double = st.next()
def characteristics(): Int = st.characteristics() | NonNull
def estimateSize(): Long = st.estimateSize()
def substep(): DoubleStepper = new WideningFloatStepper(st.substep())
}
}
Loading