Skip to content

Commit aaf2638

Browse files
committed
Adds latest version of relevant files
1 parent f8bd71c commit aaf2638

File tree

2 files changed

+139
-22
lines changed

2 files changed

+139
-22
lines changed

src/stream/stream/mod.rs

+121-4
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ mod filter_map;
3030
mod find;
3131
mod find_map;
3232
mod fold;
33+
mod for_each;
3334
mod fuse;
3435
mod inspect;
36+
mod map;
3537
mod min_by;
3638
mod next;
3739
mod nth;
@@ -41,6 +43,7 @@ mod skip;
4143
mod skip_while;
4244
mod step_by;
4345
mod take;
46+
mod try_for_each;
4447
mod zip;
4548

4649
use all::AllFuture;
@@ -50,15 +53,18 @@ use filter_map::FilterMap;
5053
use find::FindFuture;
5154
use find_map::FindMapFuture;
5255
use fold::FoldFuture;
56+
use for_each::ForEachFuture;
5357
use min_by::MinByFuture;
5458
use next::NextFuture;
5559
use nth::NthFuture;
5660
use partial_cmp::PartialCmpFuture;
61+
use try_for_each::TryForEeachFuture;
5762

5863
pub use chain::Chain;
5964
pub use filter::Filter;
6065
pub use fuse::Fuse;
6166
pub use inspect::Inspect;
67+
pub use map::Map;
6268
pub use scan::Scan;
6369
pub use skip::Skip;
6470
pub use skip_while::SkipWhile;
@@ -336,6 +342,37 @@ extension_trait! {
336342
Enumerate::new(self)
337343
}
338344

345+
#[doc = r#"
346+
Takes a closure and creates a stream that calls that closure on every element of this stream.
347+
348+
# Examples
349+
350+
```
351+
# fn main() { async_std::task::block_on(async {
352+
#
353+
use async_std::prelude::*;
354+
use std::collections::VecDeque;
355+
356+
let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect();
357+
let mut s = s.map(|x| 2 * x);
358+
359+
assert_eq!(s.next().await, Some(2));
360+
assert_eq!(s.next().await, Some(4));
361+
assert_eq!(s.next().await, Some(6));
362+
assert_eq!(s.next().await, None);
363+
364+
#
365+
# }) }
366+
```
367+
"#]
368+
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B>
369+
where
370+
Self: Sized,
371+
F: FnMut(Self::Item) -> B,
372+
{
373+
Map::new(self, f)
374+
}
375+
339376
#[doc = r#"
340377
A combinator that does something with each element in the stream, passing the value
341378
on.
@@ -752,6 +789,41 @@ extension_trait! {
752789
FoldFuture::new(self, init, f)
753790
}
754791

792+
#[doc = r#"
793+
Call a closure on each element of the stream.
794+
795+
# Examples
796+
797+
```
798+
# fn main() { async_std::task::block_on(async {
799+
#
800+
use async_std::prelude::*;
801+
use std::collections::VecDeque;
802+
use std::sync::mpsc::channel;
803+
804+
let (tx, rx) = channel();
805+
806+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
807+
let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
808+
809+
let v: Vec<_> = rx.iter().collect();
810+
811+
assert_eq!(v, vec![1, 2, 3]);
812+
#
813+
# }) }
814+
```
815+
"#]
816+
fn for_each<F>(
817+
self,
818+
f: F,
819+
) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>]
820+
where
821+
Self: Sized,
822+
F: FnMut(Self::Item),
823+
{
824+
ForEachFuture::new(self, f)
825+
}
826+
755827
#[doc = r#"
756828
Tests if any element of the stream matches a predicate.
757829
@@ -923,6 +995,51 @@ extension_trait! {
923995
Skip::new(self, n)
924996
}
925997

998+
#[doc = r#"
999+
Applies a falliable function to each element in a stream, stopping at first error and returning it.
1000+
1001+
# Examples
1002+
1003+
```
1004+
# fn main() { async_std::task::block_on(async {
1005+
#
1006+
use std::collections::VecDeque;
1007+
use std::sync::mpsc::channel;
1008+
use async_std::prelude::*;
1009+
1010+
let (tx, rx) = channel();
1011+
1012+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
1013+
let s = s.try_for_each(|v| {
1014+
if v % 2 == 1 {
1015+
tx.clone().send(v).unwrap();
1016+
Ok(())
1017+
} else {
1018+
Err("even")
1019+
}
1020+
});
1021+
1022+
let res = s.await;
1023+
drop(tx);
1024+
let values: Vec<_> = rx.iter().collect();
1025+
1026+
assert_eq!(values, vec![1]);
1027+
assert_eq!(res, Err("even"));
1028+
#
1029+
# }) }
1030+
```
1031+
"#]
1032+
fn try_for_each<F, E>(
1033+
self,
1034+
f: F,
1035+
) -> impl Future<Output = E> [TryForEeachFuture<Self, F, Self::Item, E>]
1036+
where
1037+
Self: Sized,
1038+
F: FnMut(Self::Item) -> Result<(), E>,
1039+
{
1040+
TryForEeachFuture::new(self, f)
1041+
}
1042+
9261043
#[doc = r#"
9271044
'Zips up' two streams into a single stream of pairs.
9281045
@@ -1064,15 +1181,15 @@ extension_trait! {
10641181
fn partial_cmp<S>(
10651182
self,
10661183
other: S
1067-
) -> impl Future<Output = Option<Ordering>> + '_ [PartialCmpFuture<Self, S>]
1184+
) -> impl Future<Output = Option<Ordering>> [PartialCmpFuture<Self, S>]
10681185
where
10691186
Self: Sized + Stream,
1070-
S: Stream,
1071-
Self::Item: PartialOrd<S::Item>,
1187+
S: Stream,
1188+
<Self as Stream>::Item: PartialOrd<S::Item>,
10721189
{
10731190
PartialCmpFuture::new(self, other)
10741191
}
1075-
1192+
10761193
}
10771194

10781195
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

src/stream/stream/partial_cmp.rs

+18-18
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::cmp::Ordering;
22
use std::pin::Pin;
33

44
use super::fuse::Fuse;
5-
use crate::prelude::*;
65
use crate::future::Future;
6+
use crate::prelude::*;
77
use crate::stream::Stream;
88
use crate::task::{Context, Poll};
99

@@ -15,7 +15,7 @@ pub struct PartialCmpFuture<L: Stream, R: Stream> {
1515
l: Fuse<L>,
1616
r: Fuse<R>,
1717
l_cache: Option<L::Item>,
18-
r_cache: Option<R::Item>,
18+
r_cache: Option<R::Item>,
1919
}
2020

2121
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
@@ -28,36 +28,36 @@ impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
2828
PartialCmpFuture {
2929
l: l.fuse(),
3030
r: r.fuse(),
31-
l_cache: None,
32-
r_cache: None,
31+
l_cache: None,
32+
r_cache: None,
3333
}
3434
}
3535
}
3636

37-
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
38-
where
37+
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
38+
where
3939
L: Stream + Sized,
4040
R: Stream + Sized,
41-
L::Item: PartialOrd<R::Item>
41+
L::Item: PartialOrd<R::Item>,
4242
{
4343
type Output = Option<Ordering>;
4444

45-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4646
loop {
47-
// Short circuit logic
47+
// Short circuit logic
4848
// Stream that completes earliest can be considered Less, etc
4949
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
5050
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
5151

5252
if l_complete && r_complete {
53-
return Poll::Ready(Some(Ordering::Equal))
53+
return Poll::Ready(Some(Ordering::Equal));
5454
} else if l_complete {
55-
return Poll::Ready(Some(Ordering::Less))
55+
return Poll::Ready(Some(Ordering::Less));
5656
} else if r_complete {
57-
return Poll::Ready(Some(Ordering::Greater))
57+
return Poll::Ready(Some(Ordering::Greater));
5858
}
5959

60-
// Get next value if possible and necesary
60+
// Get next value if possible and necesary
6161
if !self.l.done && self.as_mut().l_cache.is_none() {
6262
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
6363
if let Some(item) = l_next {
@@ -75,17 +75,17 @@ where
7575
// Compare if both values are available.
7676
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
7777
let l_value = self.as_mut().l_cache().take().unwrap();
78-
let r_value = self.as_mut().r_cache().take().unwrap();
78+
let r_value = self.as_mut().r_cache().take().unwrap();
7979
let result = l_value.partial_cmp(&r_value);
8080

8181
if let Some(Ordering::Equal) = result {
82-
// Reset cache to prepare for next comparison
83-
*self.as_mut().l_cache() = None;
82+
// Reset cache to prepare for next comparison
83+
*self.as_mut().l_cache() = None;
8484
*self.as_mut().r_cache() = None;
8585
} else {
86-
// Return non equal value
86+
// Return non equal value
8787
return Poll::Ready(result);
88-
}
88+
}
8989
}
9090
}
9191
}

0 commit comments

Comments
 (0)