Skip to content

Commit 50b6d0b

Browse files
authored
Merge pull request #269 from montekki/fs-stream-try-for-each
Adds try_for_each combinator
2 parents 98c79f4 + 35ab65f commit 50b6d0b

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

src/stream/stream/mod.rs

+47
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod skip;
4141
mod skip_while;
4242
mod step_by;
4343
mod take;
44+
mod try_for_each;
4445
mod zip;
4546

4647
use all::AllFuture;
@@ -54,6 +55,7 @@ use for_each::ForEachFuture;
5455
use min_by::MinByFuture;
5556
use next::NextFuture;
5657
use nth::NthFuture;
58+
use try_for_each::TryForEeachFuture;
5759

5860
pub use chain::Chain;
5961
pub use filter::Filter;
@@ -958,6 +960,51 @@ extension_trait! {
958960
Skip::new(self, n)
959961
}
960962

963+
#[doc = r#"
964+
Applies a falliable function to each element in a stream, stopping at first error and returning it.
965+
966+
# Examples
967+
968+
```
969+
# fn main() { async_std::task::block_on(async {
970+
#
971+
use std::collections::VecDeque;
972+
use std::sync::mpsc::channel;
973+
use async_std::prelude::*;
974+
975+
let (tx, rx) = channel();
976+
977+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
978+
let s = s.try_for_each(|v| {
979+
if v % 2 == 1 {
980+
tx.clone().send(v).unwrap();
981+
Ok(())
982+
} else {
983+
Err("even")
984+
}
985+
});
986+
987+
let res = s.await;
988+
drop(tx);
989+
let values: Vec<_> = rx.iter().collect();
990+
991+
assert_eq!(values, vec![1]);
992+
assert_eq!(res, Err("even"));
993+
#
994+
# }) }
995+
```
996+
"#]
997+
fn try_for_each<F, E>(
998+
self,
999+
f: F,
1000+
) -> impl Future<Output = E> [TryForEeachFuture<Self, F, Self::Item, E>]
1001+
where
1002+
Self: Sized,
1003+
F: FnMut(Self::Item) -> Result<(), E>,
1004+
{
1005+
TryForEeachFuture::new(self, f)
1006+
}
1007+
9611008
#[doc = r#"
9621009
'Zips up' two streams into a single stream of pairs.
9631010

src/stream/stream/try_for_each.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
4+
use crate::future::Future;
5+
use crate::stream::Stream;
6+
use crate::task::{Context, Poll};
7+
8+
#[doc(hidden)]
9+
#[allow(missing_debug_implementations)]
10+
pub struct TryForEeachFuture<S, F, T, R> {
11+
stream: S,
12+
f: F,
13+
__from: PhantomData<T>,
14+
__to: PhantomData<R>,
15+
}
16+
17+
impl<S, F, T, R> TryForEeachFuture<S, F, T, R> {
18+
pin_utils::unsafe_pinned!(stream: S);
19+
pin_utils::unsafe_unpinned!(f: F);
20+
21+
pub(crate) fn new(stream: S, f: F) -> Self {
22+
TryForEeachFuture {
23+
stream,
24+
f,
25+
__from: PhantomData,
26+
__to: PhantomData,
27+
}
28+
}
29+
}
30+
31+
impl<S, F, E> Future for TryForEeachFuture<S, F, S::Item, E>
32+
where
33+
S: Stream,
34+
S::Item: std::fmt::Debug,
35+
F: FnMut(S::Item) -> Result<(), E>,
36+
{
37+
type Output = Result<(), E>;
38+
39+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40+
loop {
41+
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
42+
43+
match item {
44+
None => return Poll::Ready(Ok(())),
45+
Some(v) => {
46+
let res = (self.as_mut().f())(v);
47+
if let Err(e) = res {
48+
return Poll::Ready(Err(e));
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)