Skip to content

Commit 88f0179

Browse files
committed
fix(client): send an Error::Cancel if a queued request is dropped
Adds `Error::Cancel` variant.
1 parent a821a36 commit 88f0179

File tree

3 files changed

+104
-5
lines changed

3 files changed

+104
-5
lines changed

src/client/cancel.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ impl Cancel {
5757
}
5858
}
5959

60+
impl Canceled {
61+
pub fn cancel(&self) {
62+
self.inner.is_canceled.store(true, Ordering::SeqCst);
63+
}
64+
}
65+
6066
impl Future for Canceled {
6167
type Item = ();
6268
type Error = Never;
@@ -87,7 +93,7 @@ impl Future for Canceled {
8793

8894
impl Drop for Canceled {
8995
fn drop(&mut self) {
90-
self.inner.is_canceled.store(true, Ordering::SeqCst);
96+
self.cancel();
9197
}
9298
}
9399

src/client/dispatch.rs

+49-4
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,66 @@ impl<T, U> Stream for Receiver<T, U> {
6969
}
7070
}
7171

72-
//TODO: Drop for Receiver should consume inner
72+
impl<T, U> Drop for Receiver<T, U> {
73+
fn drop(&mut self) {
74+
self.canceled.cancel();
75+
self.inner.close();
76+
77+
// This poll() is safe to call in `Drop`, because we've
78+
// called, `close`, which promises that no new messages
79+
// will arrive, and thus, once we reach the end, we won't
80+
// see a `NotReady` (and try to park), but a Ready(None).
81+
//
82+
// All other variants:
83+
// - Ready(None): the end. we want to stop looping
84+
// - NotReady: unreachable
85+
// - Err: unreachable
86+
while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() {
87+
// maybe in future, we pass the value along with the error?
88+
let _ = cb.send(Err(::Error::new_canceled()));
89+
}
90+
}
91+
92+
}
7393

7494
#[cfg(test)]
7595
mod tests {
76-
96+
extern crate pretty_env_logger;
7797
#[cfg(feature = "nightly")]
7898
extern crate test;
7999

100+
use futures::{future, Future};
101+
80102
#[cfg(feature = "nightly")]
81-
use futures::{Future, Stream};
103+
use futures::{Stream};
104+
105+
#[test]
106+
fn drop_receiver_sends_cancel_errors() {
107+
let _ = pretty_env_logger::try_init();
108+
109+
future::lazy(|| {
110+
#[derive(Debug)]
111+
struct Custom(i32);
112+
let (tx, rx) = super::channel::<Custom, ()>();
113+
114+
let promise = tx.send(Custom(43)).unwrap();
115+
drop(rx);
116+
117+
promise.then(|fulfilled| {
118+
let res = fulfilled.expect("fulfilled");
119+
match res.unwrap_err() {
120+
::Error::Cancel(_) => (),
121+
e => panic!("expected Error::Cancel(_), found {:?}", e),
122+
}
123+
124+
Ok::<(), ()>(())
125+
})
126+
}).wait().unwrap();
127+
}
82128

83129
#[cfg(feature = "nightly")]
84130
#[bench]
85131
fn cancelable_queue_throughput(b: &mut test::Bencher) {
86-
87132
let (tx, mut rx) = super::channel::<i32, ()>();
88133

89134
b.iter(move || {

src/error.rs

+48
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use self::Error::{
1717
Status,
1818
Timeout,
1919
Upgrade,
20+
Cancel,
2021
Io,
2122
TooLarge,
2223
Incomplete,
@@ -47,6 +48,8 @@ pub enum Error {
4748
Timeout,
4849
/// A protocol upgrade was encountered, but not yet supported in hyper.
4950
Upgrade,
51+
/// A pending item was dropped before ever being processed.
52+
Cancel(Canceled),
5053
/// An `io::Error` that occurred while trying to read or write to a network stream.
5154
Io(IoError),
5255
/// Parsing a field as string failed
@@ -56,6 +59,45 @@ pub enum Error {
5659
__Nonexhaustive(Void)
5760
}
5861

62+
impl Error {
63+
pub(crate) fn new_canceled() -> Error {
64+
Error::Cancel(Canceled {
65+
_inner: (),
66+
})
67+
}
68+
}
69+
70+
/// A pending item was dropped before ever being processed.
71+
///
72+
/// For example, a `Request` could be queued in the `Client`, *just*
73+
/// as the related connection gets closed by the remote. In that case,
74+
/// when the connection drops, the pending response future will be
75+
/// fulfilled with this error, signaling the `Request` was never started.
76+
pub struct Canceled {
77+
// maybe in the future this contains an optional value of
78+
// what was canceled?
79+
_inner: (),
80+
}
81+
82+
impl Canceled {
83+
fn description(&self) -> &str {
84+
"an operation was canceled internally before starting"
85+
}
86+
}
87+
88+
impl fmt::Debug for Canceled {
89+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90+
f.debug_struct("Canceled")
91+
.finish()
92+
}
93+
}
94+
95+
impl fmt::Display for Canceled {
96+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97+
f.pad(self.description())
98+
}
99+
}
100+
59101
#[doc(hidden)]
60102
pub struct Void(());
61103

@@ -87,6 +129,7 @@ impl StdError for Error {
87129
Incomplete => "message is incomplete",
88130
Timeout => "timeout",
89131
Upgrade => "unsupported protocol upgrade",
132+
Cancel(ref e) => e.description(),
90133
Uri(ref e) => e.description(),
91134
Io(ref e) => e.description(),
92135
Utf8(ref e) => e.description(),
@@ -143,6 +186,11 @@ impl From<httparse::Error> for Error {
143186
}
144187
}
145188

189+
#[doc(hidden)]
190+
trait AssertSendSync: Send + Sync + 'static {}
191+
#[doc(hidden)]
192+
impl AssertSendSync for Error {}
193+
146194
#[cfg(test)]
147195
mod tests {
148196
use std::error::Error as StdError;

0 commit comments

Comments
 (0)