Skip to content

Commit 66d38f7

Browse files
committed
Adds try_for_each combinator
1 parent 33d2191 commit 66d38f7

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
4848
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
4949
#![recursion_limit = "1024"]
50+
#![feature(try_trait)]
5051

5152
use cfg_if::cfg_if;
5253

src/stream/stream/mod.rs

+49
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod skip;
4040
mod skip_while;
4141
mod step_by;
4242
mod take;
43+
mod try_for_each;
4344
mod zip;
4445

4546
use all::AllFuture;
@@ -52,6 +53,7 @@ use fold::FoldFuture;
5253
use min_by::MinByFuture;
5354
use next::NextFuture;
5455
use nth::NthFuture;
56+
use try_for_each::TryForEeachFuture;
5557

5658
pub use chain::Chain;
5759
pub use filter::Filter;
@@ -66,6 +68,7 @@ pub use zip::Zip;
6668

6769
use std::cmp::Ordering;
6870
use std::marker::PhantomData;
71+
use std::ops::Try;
6972

7073
use cfg_if::cfg_if;
7174

@@ -921,6 +924,52 @@ extension_trait! {
921924
Skip::new(self, n)
922925
}
923926

927+
#[doc = r#"
928+
Applies a falliable function to each element in a stream, stopping at first error and returning it.
929+
930+
# Examples
931+
932+
```
933+
# fn main() { async_std::task::block_on(async {
934+
#
935+
use std::collections::VecDeque;
936+
use std::sync::mpsc::channel;
937+
use async_std::prelude::*;
938+
939+
let (tx, rx) = channel();
940+
941+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
942+
let s = s.try_for_each(|v| {
943+
if v % 2 == 1 {
944+
tx.clone().send(v).unwrap();
945+
Ok(())
946+
} else {
947+
Err("even")
948+
}
949+
});
950+
951+
let res = s.await;
952+
drop(tx);
953+
let values: Vec<_> = rx.iter().collect();
954+
955+
assert_eq!(values, vec![1]);
956+
assert_eq!(res, Err("even"));
957+
#
958+
# }) }
959+
```
960+
"#]
961+
fn try_for_each<F, R>(
962+
self,
963+
f: F,
964+
) -> impl Future<Output = R> [TryForEeachFuture<Self, F, Self::Item, R>]
965+
where
966+
Self: Sized,
967+
F: FnMut(Self::Item) -> R,
968+
R: Try<Ok = ()>,
969+
{
970+
TryForEeachFuture::new(self, f)
971+
}
972+
924973
#[doc = r#"
925974
'Zips up' two streams into a single stream of pairs.
926975

src/stream/stream/try_for_each.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::marker::PhantomData;
2+
use std::ops::Try;
3+
use std::pin::Pin;
4+
5+
use crate::future::Future;
6+
use crate::stream::Stream;
7+
use crate::task::{Context, Poll};
8+
9+
#[doc(hidden)]
10+
#[allow(missing_debug_implementations)]
11+
pub struct TryForEeachFuture<S, F, T, R> {
12+
stream: S,
13+
f: F,
14+
__from: PhantomData<T>,
15+
__to: PhantomData<R>,
16+
}
17+
18+
impl<S, F, T, R> TryForEeachFuture<S, F, T, R> {
19+
pin_utils::unsafe_pinned!(stream: S);
20+
pin_utils::unsafe_unpinned!(f: F);
21+
22+
pub(crate) fn new(stream: S, f: F) -> Self {
23+
TryForEeachFuture {
24+
stream,
25+
f,
26+
__from: PhantomData,
27+
__to: PhantomData,
28+
}
29+
}
30+
}
31+
32+
impl<S, F, R> Future for TryForEeachFuture<S, F, S::Item, R>
33+
where
34+
S: Stream,
35+
S::Item: std::fmt::Debug,
36+
F: FnMut(S::Item) -> R,
37+
R: Try<Ok = ()>,
38+
{
39+
type Output = R;
40+
41+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42+
loop {
43+
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
44+
45+
match item {
46+
None => return Poll::Ready(R::from_ok(())),
47+
Some(v) => {
48+
let res = (self.as_mut().f())(v);
49+
if let Err(e) = res.into_result() {
50+
return Poll::Ready(R::from_error(e));
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)