Skip to content

Commit 8576dbe

Browse files
committed
Replaces waker function with loop, reomves Unpin trait bounds
1 parent 8adc18e commit 8576dbe

File tree

1 file changed

+41
-43
lines changed

1 file changed

+41
-43
lines changed

src/stream/stream/partial_cmp.rs

+41-43
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ pub struct PartialCmpFuture<L: Stream, R: Stream> {
1717
r_cache: Option<R::Item>,
1818
}
1919

20-
impl<L: Stream + Unpin, R: Stream + Unpin> Unpin for PartialCmpFuture<L, R> {}
21-
2220
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
2321
pin_utils::unsafe_pinned!(l: Fuse<L>);
2422
pin_utils::unsafe_pinned!(r: Fuse<R>);
23+
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
24+
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
2525

2626
pub(super) fn new(l: L, r: R) -> Self {
2727
PartialCmpFuture {
@@ -35,59 +35,57 @@ impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
3535

3636
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
3737
where
38-
L: Stream + Unpin + Sized,
39-
R: Stream + Unpin + Sized,
38+
L: Stream + Sized,
39+
R: Stream + Sized,
4040
L::Item: PartialOrd<R::Item>
4141
{
4242
type Output = Option<Ordering>;
4343

4444
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45-
// Short circuit logic
46-
// Stream that completes earliest can be considered Less, etc
47-
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
48-
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
45+
loop {
46+
// Short circuit logic
47+
// Stream that completes earliest can be considered Less, etc
48+
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
49+
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
4950

50-
if l_complete && r_complete {
51-
return Poll::Ready(Some(Ordering::Equal))
52-
} else if l_complete {
53-
return Poll::Ready(Some(Ordering::Less))
54-
} else if r_complete {
55-
return Poll::Ready(Some(Ordering::Greater))
56-
}
51+
if l_complete && r_complete {
52+
return Poll::Ready(Some(Ordering::Equal))
53+
} else if l_complete {
54+
return Poll::Ready(Some(Ordering::Less))
55+
} else if r_complete {
56+
return Poll::Ready(Some(Ordering::Greater))
57+
}
5758

58-
// Get next value if possible and necesary
59-
if !self.l.done && self.as_mut().l_cache.is_none() {
60-
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
61-
if let Some(item) = l_next {
62-
self.as_mut().l_cache = Some(item);
59+
// Get next value if possible and necesary
60+
if !self.l.done && self.as_mut().l_cache.is_none() {
61+
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
62+
if let Some(item) = l_next {
63+
*self.as_mut().l_cache() = Some(item);
64+
}
6365
}
64-
}
6566

66-
if !self.r.done && self.as_mut().r_cache.is_none() {
67-
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
68-
if let Some(item) = r_next {
69-
self.as_mut().r_cache = Some(item);
67+
if !self.r.done && self.as_mut().r_cache.is_none() {
68+
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
69+
if let Some(item) = r_next {
70+
*self.as_mut().r_cache() = Some(item);
71+
}
7072
}
71-
}
7273

73-
// Compare if both values are available.
74-
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
75-
let l_value = self.as_mut().l_cache.take().unwrap();
76-
let r_value = self.as_mut().r_cache.take().unwrap();
77-
let result = l_value.partial_cmp(&r_value);
74+
// Compare if both values are available.
75+
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
76+
let l_value = self.as_mut().l_cache().take().unwrap();
77+
let r_value = self.as_mut().r_cache().take().unwrap();
78+
let result = l_value.partial_cmp(&r_value);
7879

79-
if let Some(Ordering::Equal) = result {
80-
// Reset cache to prepare for next comparison
81-
self.as_mut().l_cache = None;
82-
self.as_mut().r_cache = None;
83-
} else {
84-
// Return non equal value
85-
return Poll::Ready(result);
86-
}
80+
if let Some(Ordering::Equal) = result {
81+
// Reset cache to prepare for next comparison
82+
*self.as_mut().l_cache() = None;
83+
*self.as_mut().r_cache() = None;
84+
} else {
85+
// Return non equal value
86+
return Poll::Ready(result);
87+
}
88+
}
8789
}
88-
89-
// wakes task to pull the next item from stream
90-
cx.waker().wake_by_ref();
91-
Poll::Pending
9290
}
9391
}

0 commit comments

Comments
 (0)