Skip to content

Commit 3d3bf91

Browse files
authored
Merge pull request #562 from felipesere/double_ended_ext
DoubleEndedStream extension
2 parents 63b6a2b + 182fe68 commit 3d3bf91

File tree

10 files changed

+472
-26
lines changed

10 files changed

+472
-26
lines changed

src/stream/double_ended_stream.rs

-24
This file was deleted.

src/stream/double_ended_stream/mod.rs

+241
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
use crate::stream::Stream;
2+
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
mod next_back;
7+
mod nth_back;
8+
mod rfind;
9+
mod rfold;
10+
mod try_rfold;
11+
12+
use next_back::NextBackFuture;
13+
use nth_back::NthBackFuture;
14+
use rfind::RFindFuture;
15+
use rfold::RFoldFuture;
16+
use try_rfold::TryRFoldFuture;
17+
18+
/// A stream able to yield elements from both ends.
19+
///
20+
/// Something that implements `DoubleEndedStream` has one extra capability
21+
/// over something that implements [`Stream`]: the ability to also take
22+
/// `Item`s from the back, as well as the front.
23+
///
24+
/// [`Stream`]: trait.Stream.html
25+
#[cfg(feature = "unstable")]
26+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
27+
pub trait DoubleEndedStream: Stream {
28+
#[doc = r#"
29+
Attempts to receive the next item from the back of the stream.
30+
31+
There are several possible return values:
32+
33+
* `Poll::Pending` means this stream's next_back value is not ready yet.
34+
* `Poll::Ready(None)` means this stream has been exhausted.
35+
* `Poll::Ready(Some(item))` means `item` was received out of the stream.
36+
37+
# Examples
38+
39+
```
40+
# fn main() { async_std::task::block_on(async {
41+
#
42+
use std::pin::Pin;
43+
44+
use async_std::prelude::*;
45+
use async_std::stream;
46+
use async_std::task::{Context, Poll};
47+
48+
fn increment(
49+
s: impl DoubleEndedStream<Item = i32> + Unpin,
50+
) -> impl DoubleEndedStream<Item = i32> + Unpin {
51+
struct Increment<S>(S);
52+
53+
impl<S: DoubleEndedStream<Item = i32> + Unpin> Stream for Increment<S> {
54+
type Item = S::Item;
55+
56+
fn poll_next(
57+
mut self: Pin<&mut Self>,
58+
cx: &mut Context<'_>,
59+
) -> Poll<Option<Self::Item>> {
60+
match Pin::new(&mut self.0).poll_next(cx) {
61+
Poll::Pending => Poll::Pending,
62+
Poll::Ready(None) => Poll::Ready(None),
63+
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
64+
}
65+
}
66+
}
67+
68+
impl<S: DoubleEndedStream<Item = i32> + Unpin> DoubleEndedStream for Increment<S> {
69+
fn poll_next_back(
70+
mut self: Pin<&mut Self>,
71+
cx: &mut Context<'_>,
72+
) -> Poll<Option<Self::Item>> {
73+
match Pin::new(&mut self.0).poll_next_back(cx) {
74+
Poll::Pending => Poll::Pending,
75+
Poll::Ready(None) => Poll::Ready(None),
76+
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
77+
}
78+
}
79+
}
80+
81+
Increment(s)
82+
}
83+
84+
let mut s = increment(stream::once(7));
85+
86+
assert_eq!(s.next_back().await, Some(8));
87+
assert_eq!(s.next_back().await, None);
88+
#
89+
# }) }
90+
```
91+
"#]
92+
fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
93+
94+
#[doc = r#"
95+
Advances the stream and returns the next value.
96+
97+
Returns [`None`] when iteration is finished. Individual stream implementations may
98+
choose to resume iteration, and so calling `next()` again may or may not eventually
99+
start returning more values.
100+
101+
[`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
102+
103+
# Examples
104+
105+
```
106+
# fn main() { async_std::task::block_on(async {
107+
#
108+
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
109+
110+
let mut s = double_ended_stream::from_iter(vec![7u8]);
111+
112+
assert_eq!(s.next_back().await, Some(7));
113+
assert_eq!(s.next_back().await, None);
114+
#
115+
# }) }
116+
```
117+
"#]
118+
fn next_back(&mut self) -> NextBackFuture<'_, Self>
119+
where
120+
Self: Unpin,
121+
{
122+
NextBackFuture { stream: self }
123+
}
124+
125+
#[doc = r#"
126+
Returns the nth element from the back of the stream.
127+
128+
# Examples
129+
130+
Basic usage:
131+
132+
```
133+
# fn main() { async_std::task::block_on(async {
134+
#
135+
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
136+
137+
let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
138+
139+
let second = s.nth_back(1).await;
140+
assert_eq!(second, Some(4));
141+
#
142+
# }) }
143+
```
144+
"#]
145+
fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self>
146+
where
147+
Self: Unpin + Sized,
148+
{
149+
NthBackFuture::new(self, n)
150+
}
151+
152+
#[doc = r#"
153+
Returns the the frist element from the right that matches the predicate.
154+
155+
# Examples
156+
157+
Basic usage:
158+
159+
```
160+
# fn main() { async_std::task::block_on(async {
161+
#
162+
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
163+
164+
let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
165+
166+
let second = s.rfind(|v| v % 2 == 0).await;
167+
assert_eq!(second, Some(4));
168+
#
169+
# }) }
170+
```
171+
"#]
172+
fn rfind<P>(&mut self, p: P) -> RFindFuture<'_, Self, P>
173+
where
174+
Self: Unpin + Sized,
175+
P: FnMut(&Self::Item) -> bool,
176+
{
177+
RFindFuture::new(self, p)
178+
}
179+
180+
#[doc = r#"
181+
# Examples
182+
183+
Basic usage:
184+
185+
```
186+
# fn main() { async_std::task::block_on(async {
187+
#
188+
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
189+
190+
let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
191+
192+
let second = s.rfold(0, |acc, v| v + acc).await;
193+
194+
assert_eq!(second, 15);
195+
#
196+
# }) }
197+
```
198+
"#]
199+
fn rfold<B, F>(self, accum: B, f: F) -> RFoldFuture<Self, F, B>
200+
where
201+
Self: Sized,
202+
F: FnMut(B, Self::Item) -> B,
203+
{
204+
RFoldFuture::new(self, accum, f)
205+
}
206+
207+
#[doc = r#"
208+
A combinator that applies a function as long as it returns successfully, producing a single, final value.
209+
Immediately returns the error when the function returns unsuccessfully.
210+
211+
# Examples
212+
213+
Basic usage:
214+
215+
```
216+
# fn main() { async_std::task::block_on(async {
217+
#
218+
use async_std::stream::double_ended_stream::{self, DoubleEndedStream};
219+
220+
let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]);
221+
let sum = s.try_rfold(0, |acc, v| {
222+
if (acc+v) % 2 == 1 {
223+
Ok(v+3)
224+
} else {
225+
Err("fail")
226+
}
227+
}).await;
228+
229+
assert_eq!(sum, Err("fail"));
230+
#
231+
# }) }
232+
```
233+
"#]
234+
fn try_rfold<B, F, E>(self, accum: B, f: F) -> TryRFoldFuture<Self, F, B>
235+
where
236+
Self: Sized,
237+
F: FnMut(B, Self::Item) -> Result<B, E>,
238+
{
239+
TryRFoldFuture::new(self, accum, f)
240+
}
241+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::pin::Pin;
2+
use std::future::Future;
3+
4+
use crate::stream::DoubleEndedStream;
5+
use crate::task::{Context, Poll};
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct NextBackFuture<'a, T: Unpin + ?Sized> {
10+
pub(crate) stream: &'a mut T,
11+
}
12+
13+
impl<T: DoubleEndedStream + Unpin + ?Sized> Future for NextBackFuture<'_, T> {
14+
type Output = Option<T::Item>;
15+
16+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
17+
Pin::new(&mut *self.stream).poll_next_back(cx)
18+
}
19+
}
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use crate::stream::DoubleEndedStream;
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct NthBackFuture<'a, S> {
10+
stream: &'a mut S,
11+
n: usize,
12+
}
13+
14+
impl<'a, S> NthBackFuture<'a, S> {
15+
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
16+
NthBackFuture { stream, n }
17+
}
18+
}
19+
20+
impl<'a, S> Future for NthBackFuture<'a, S>
21+
where
22+
S: DoubleEndedStream + Sized + Unpin,
23+
{
24+
type Output = Option<S::Item>;
25+
26+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27+
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));
28+
match next {
29+
Some(v) => match self.n {
30+
0 => Poll::Ready(Some(v)),
31+
_ => {
32+
self.n -= 1;
33+
cx.waker().wake_by_ref();
34+
Poll::Pending
35+
}
36+
},
37+
None => Poll::Ready(None),
38+
}
39+
}
40+
}
41+
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::task::{Context, Poll};
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
5+
use crate::stream::DoubleEndedStream;
6+
7+
#[doc(hidden)]
8+
#[allow(missing_debug_implementations)]
9+
pub struct RFindFuture<'a, S, P> {
10+
stream: &'a mut S,
11+
p: P,
12+
}
13+
14+
impl<'a, S, P> RFindFuture<'a, S, P> {
15+
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
16+
RFindFuture { stream, p }
17+
}
18+
}
19+
20+
impl<S: Unpin, P> Unpin for RFindFuture<'_, S, P> {}
21+
22+
impl<'a, S, P> Future for RFindFuture<'a, S, P>
23+
where
24+
S: DoubleEndedStream + Unpin + Sized,
25+
P: FnMut(&S::Item) -> bool,
26+
{
27+
type Output = Option<S::Item>;
28+
29+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
30+
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));
31+
32+
match item {
33+
Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)),
34+
Some(_) => {
35+
cx.waker().wake_by_ref();
36+
Poll::Pending
37+
}
38+
None => Poll::Ready(None),
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)