Skip to content

Async combinators #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
5 changes: 4 additions & 1 deletion src/collections/binary_heap/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ impl<T: Ord> stream::Extend<T> for BinaryHeap<T> {

self.reserve(stream.size_hint().0);

Box::pin(stream.for_each(move |item| self.push(item)))
Box::pin(stream.for_each(move |item| {
self.push(item);
async {}
}))
}
}
1 change: 1 addition & 0 deletions src/collections/btree_map/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ impl<K: Ord, V> stream::Extend<(K, V)> for BTreeMap<K, V> {
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
Box::pin(stream.into_stream().for_each(move |(k, v)| {
self.insert(k, v);
async {}
}))
}
}
1 change: 1 addition & 0 deletions src/collections/btree_set/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ impl<T: Ord> stream::Extend<T> for BTreeSet<T> {
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
Box::pin(stream.into_stream().for_each(move |item| {
self.insert(item);
async {}
}))
}
}
1 change: 1 addition & 0 deletions src/collections/hash_map/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ where

Box::pin(stream.for_each(move |(k, v)| {
self.insert(k, v);
async {}
}))
}
}
1 change: 1 addition & 0 deletions src/collections/hash_set/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ where

Box::pin(stream.for_each(move |item| {
self.insert(item);
async {}
}))
}
}
5 changes: 4 additions & 1 deletion src/collections/linked_list/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ impl<T> stream::Extend<T> for LinkedList<T> {
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
let stream = stream.into_stream();
Box::pin(stream.for_each(move |item| self.push_back(item)))
Box::pin(stream.for_each(move |item| {
self.push_back(item);
async {}
}))
}
}
5 changes: 4 additions & 1 deletion src/collections/vec_deque/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ impl<T> stream::Extend<T> for VecDeque<T> {

self.reserve(stream.size_hint().0);

Box::pin(stream.for_each(move |item| self.push_back(item)))
Box::pin(stream.for_each(move |item| {
self.push_back(item);
async {}
}))
}
}
3 changes: 2 additions & 1 deletion src/io/buf_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ extension_trait! {

let cursor = io::Cursor::new(b"lorem-ipsum-dolor");

let mut split_iter = cursor.split(b'-').map(|l| l.unwrap());
let split_iter = cursor.split(b'-').map(|l| async move { l.unwrap() });
pin_utils::pin_mut!(split_iter);
assert_eq!(split_iter.next().await, Some(b"lorem".to_vec()));
assert_eq!(split_iter.next().await, Some(b"ipsum".to_vec()));
assert_eq!(split_iter.next().await, Some(b"dolor".to_vec()));
Expand Down
2 changes: 1 addition & 1 deletion src/option/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
false
}
})
.filter_map(identity)
.filter_map(|a| async move { identity(a) })
.collect()
.await;

Expand Down
7 changes: 4 additions & 3 deletions src/option/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ where
use async_std::stream;

let v = stream::from_iter(vec![1, 2, 4]);
let prod: Option<i32> = v.map(|x|
let prod: Option<i32> = v.map(|x| async move {
if x < 0 {
None
} else {
Some(x)
}).product().await;
}
}).product().await;
assert_eq!(prod, Some(8));
#
# }) }
Expand All @@ -53,7 +54,7 @@ where
false
}
})
.filter_map(identity),
.filter_map(|a| async move { identity(a) }),
)
.await;

Expand Down
4 changes: 2 additions & 2 deletions src/option/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
use async_std::stream;

let words = stream::from_iter(vec!["have", "a", "great", "day"]);
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
let total: Option<usize> = words.map(|w| async move { w.find('a') }).sum().await;
assert_eq!(total, Some(5));
#
# }) }
Expand All @@ -48,7 +48,7 @@ where
false
}
})
.filter_map(identity),
.filter_map(|a| async move { identity(a) }),
)
.await;

Expand Down
11 changes: 7 additions & 4 deletions src/result/from_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ where
/// use async_std::stream;
///
/// let v = stream::from_iter(vec![1, 2]);
/// let res: Result<Vec<u32>, &'static str> = v.map(|x: u32|
/// let res: Result<Vec<u32>, &'static str> = v.map(|x: u32| async move {
/// x.checked_add(1).ok_or("Overflow!")
/// ).collect().await;
/// }).collect().await;
/// assert_eq!(res, Ok(vec![2, 3]));
/// #
/// # }) }
Expand All @@ -48,12 +48,15 @@ where
true
})
})
.filter_map(|elem| match elem {
.filter_map(|elem| {
let res = match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
}
},
};
async { res }
})
.collect()
.await;
Expand Down
26 changes: 15 additions & 11 deletions src/result/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ where
use async_std::stream;

let v = stream::from_iter(vec![1, 2, 4]);
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
let res: Result<i32, &'static str> = v.map(|x| async move {
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}
}).product().await;
assert_eq!(res, Ok(8));
#
Expand All @@ -55,12 +56,15 @@ where
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
}
.filter_map(|elem| {
let res = match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
}
};
async { res }
}),
)
.await;
Expand Down
26 changes: 15 additions & 11 deletions src/result/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ where
use async_std::stream;

let v = stream::from_iter(vec![1, 2]);
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
let res: Result<i32, &'static str> = v.map(|x| async move {
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}
}).sum().await;
assert_eq!(res, Ok(3));
#
Expand All @@ -55,12 +56,15 @@ where
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
}
.filter_map(|elem| {
let res = match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
}
};
async { res }
}),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
//! # use async_std::prelude::*;
//! # use async_std::stream;
//! let v = stream::repeat(1u8).take(5);
//! v.map(|x| println!("{}", x));
//! v.map(|x| async move { println!("{}", x)});
//! #
//! # Ok(()) }) }
//! ```
Expand Down
9 changes: 4 additions & 5 deletions src/stream/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub trait Product<A = Self>: Sized {
S: Stream<Item = A> + 'a;
}

use core::ops::Mul;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;

Expand All @@ -34,15 +33,15 @@ macro_rules! integer_product {
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } )
}
}
)*);
Expand All @@ -58,14 +57,14 @@ macro_rules! float_product {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } )
}
}
)*);
Expand Down
52 changes: 34 additions & 18 deletions src/stream/stream/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,63 @@ use crate::task::{Context, Poll};

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct AllFuture<'a, S, F, T> {
pub struct AllFuture<'a, S, F, Fut, T> {
pub(crate) stream: &'a mut S,
pub(crate) f: F,
pub(crate) result: bool,
pub(crate) future: Option<Fut>,
pub(crate) _marker: PhantomData<T>,
}

impl<'a, S, F, T> AllFuture<'a, S, F, T> {
impl<'a, S, F, Fut, T> AllFuture<'a, S, F, Fut, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(f: F);
pin_utils::unsafe_pinned!(future: Option<Fut>);
}

impl<'a, S, Fut, F, T> AllFuture<'a, S, F, Fut, T> {
pub(crate) fn new(stream: &'a mut S, f: F) -> Self {
Self {
stream,
f,
result: true,
future: None,
_marker: PhantomData,
}
}
}

impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}

impl<S, F> Future for AllFuture<'_, S, F, S::Item>
impl<S, F, Fut> Future for AllFuture<'_, S, F, Fut, S::Item>
where
S: Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool,
F: FnMut(S::Item) -> Fut,
Fut: Future<Output = bool>,
{
type Output = bool;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));

match next {
Some(v) => {
let result = (&mut self.f)(v);
loop {
match self.future.is_some() {
false => {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => {
let fut = (self.as_mut().f())(v);
self.as_mut().future().set(Some(fut));
}
None => return Poll::Ready(self.result),
}
}
true => {
let res =
futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));

if result {
// don't forget to wake this task again to pull the next item from stream
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(false)
self.as_mut().future().set(None);
if !res {
return Poll::Ready(false);
}
}
}
None => Poll::Ready(true),
}
}
}
Loading