Skip to content

Commit 81f475b

Browse files
committed
Adds caching logic
1 parent 22afa47 commit 81f475b

File tree

1 file changed

+63
-26
lines changed

1 file changed

+63
-26
lines changed

src/stream/stream/partial_cmp.rs

+63-26
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,36 @@
11
use std::cmp::Ordering;
22
use std::pin::Pin;
33

4+
use super::fuse::Fuse;
5+
use crate::prelude::*;
46
use crate::future::Future;
57
use crate::stream::Stream;
68
use crate::task::{Context, Poll};
79

810
// Lexicographically compares the elements of this `Stream` with those
911
// of another.
1012
#[derive(Debug)]
11-
pub struct PartialCmpFuture<L: Stream, R> {
12-
l: L,
13-
r: R,
13+
pub struct PartialCmpFuture<L: Stream, R: Stream> {
14+
l: Fuse<L>,
15+
r: Fuse<R>,
16+
l_cache: Option<L::Item>,
17+
r_cache: Option<R::Item>,
1418
}
1519

16-
impl<L: Stream + Unpin, R: Unpin> Unpin for PartialCmpFuture<L, R> {}
20+
impl<L: Stream + Unpin, R: Stream + Unpin> Unpin for PartialCmpFuture<L, R> {}
1721

18-
impl<L: Stream, R> PartialCmpFuture<L, R> {
19-
pub(crate) fn new(l: L, r: R) -> Self {
22+
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
23+
pin_utils::unsafe_pinned!(l: Fuse<L>);
24+
pin_utils::unsafe_pinned!(r: Fuse<R>);
25+
26+
pub(super) fn new(l: L, r: R) -> Self {
2027
PartialCmpFuture {
21-
l,
22-
r,
28+
l: l.fuse(),
29+
r: r.fuse(),
30+
l_cache: None,
31+
r_cache: None,
2332
}
2433
}
25-
26-
pin_utils::unsafe_pinned!(l: L);
27-
pin_utils::unsafe_pinned!(r: R);
2834
}
2935

3036
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
@@ -35,22 +41,53 @@ where
3541
{
3642
type Output = Option<Ordering>;
3743

38-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39-
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
40-
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
41-
42-
match (l_next, r_next) {
43-
(None, None) => Poll::Ready(Some(Ordering::Equal)),
44-
(None, _ ) => Poll::Ready(Some(Ordering::Less)),
45-
(_, None) => Poll::Ready(Some(Ordering::Greater)),
46-
(Some(x), Some(y)) => match x.partial_cmp(&y) {
47-
Some(Ordering::Equal) => {
48-
// wakes task to pull the next item from stream
49-
cx.waker().wake_by_ref();
50-
Poll::Pending
51-
},
52-
non_eq => Poll::Ready(non_eq),
44+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45+
// Short circuit logic
46+
// Stream that completes earliest can be considered Less, etc
47+
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
48+
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
49+
50+
if l_complete && r_complete {
51+
return Poll::Ready(Some(Ordering::Equal))
52+
} else if l_complete {
53+
return Poll::Ready(Some(Ordering::Less))
54+
} else if r_complete {
55+
return Poll::Ready(Some(Ordering::Greater))
56+
}
57+
58+
// Get next value if possible and necesary
59+
if !self.l.done && self.as_mut().l_cache.is_none() {
60+
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
61+
if let Some(item) = l_next {
62+
self.as_mut().l_cache = Some(item);
63+
}
64+
}
65+
66+
if !self.r.done && self.as_mut().r_cache.is_none() {
67+
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
68+
if let Some(item) = r_next {
69+
self.as_mut().r_cache = Some(item);
5370
}
5471
}
72+
73+
// Compare if both values are available.
74+
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
75+
let l_value = self.as_mut().l_cache.take().unwrap();
76+
let r_value = self.as_mut().r_cache.take().unwrap();
77+
let result = l_value.partial_cmp(&r_value);
78+
79+
if let Some(Ordering::Equal) = result {
80+
// Reset cache to prepare for next comparison
81+
self.as_mut().l_cache = None;
82+
self.as_mut().r_cache = None;
83+
} else {
84+
// Return non equal value
85+
return Poll::Ready(result);
86+
}
87+
}
88+
89+
// wakes task to pull the next item from stream
90+
cx.waker().wake_by_ref();
91+
Poll::Pending
5592
}
5693
}

0 commit comments

Comments
 (0)