Skip to content

Commit e0ec5ca

Browse files
committed
fix(client): detect HTTP2 connection closures sooner
1 parent 271bba1 commit e0ec5ca

File tree

2 files changed

+86
-7
lines changed

2 files changed

+86
-7
lines changed

src/proto/h2/client.rs

+35-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use bytes::IntoBuf;
22
use futures::{Async, Future, Poll, Stream};
33
use futures::future::{self, Either};
4-
use futures::sync::mpsc;
4+
use futures::sync::{mpsc, oneshot};
55
use h2::client::{Builder, Handshake, SendRequest};
66
use tokio_io::{AsyncRead, AsyncWrite};
77

@@ -18,6 +18,10 @@ type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
1818
/// other handles to it have been dropped, so that it can shutdown.
1919
type ConnDropRef = mpsc::Sender<Never>;
2020

21+
/// A oneshot channel watches the `Connection` task, and when it completes,
22+
/// the "dispatch" task will be notified and can shutdown sooner.
23+
type ConnEof = oneshot::Receiver<Never>;
24+
2125
pub(crate) struct Client<T, B>
2226
where
2327
B: Payload,
@@ -29,7 +33,7 @@ where
2933

3034
enum State<T, B> where B: IntoBuf {
3135
Handshaking(Handshake<T, B>),
32-
Ready(SendRequest<B>, ConnDropRef),
36+
Ready(SendRequest<B>, ConnDropRef, ConnEof),
3337
}
3438

3539
impl<T, B> Client<T, B>
@@ -66,14 +70,18 @@ where
6670
// in h2 where dropping all SendRequests won't notify a
6771
// parked Connection.
6872
let (tx, rx) = mpsc::channel(0);
73+
let (cancel_tx, cancel_rx) = oneshot::channel();
6974
let rx = rx.into_future()
7075
.map(|(msg, _)| match msg {
7176
Some(never) => match never {},
7277
None => (),
7378
})
7479
.map_err(|_| -> Never { unreachable!("mpsc cannot error") });
7580
let fut = conn
76-
.inspect(|_| trace!("connection complete"))
81+
.inspect(move |_| {
82+
drop(cancel_tx);
83+
trace!("connection complete")
84+
})
7785
.map_err(|e| debug!("connection error: {}", e))
7886
.select2(rx)
7987
.then(|res| match res {
@@ -92,10 +100,21 @@ where
92100
Err(Either::B((never, _))) => match never {},
93101
});
94102
self.executor.execute(fut)?;
95-
State::Ready(request_tx, tx)
103+
State::Ready(request_tx, tx, cancel_rx)
96104
},
97-
State::Ready(ref mut tx, ref conn_dropper) => {
98-
try_ready!(tx.poll_ready().map_err(::Error::new_h2));
105+
State::Ready(ref mut tx, ref conn_dropper, ref mut cancel_rx) => {
106+
match tx.poll_ready() {
107+
Ok(Async::Ready(())) => (),
108+
Ok(Async::NotReady) => return Ok(Async::NotReady),
109+
Err(err) => {
110+
return if err.reason() == Some(::h2::Reason::NO_ERROR) {
111+
trace!("connection gracefully shutdown");
112+
Ok(Async::Ready(Dispatched::Shutdown))
113+
} else {
114+
Err(::Error::new_h2(err))
115+
};
116+
}
117+
}
99118
match self.rx.poll() {
100119
Ok(Async::Ready(Some((req, cb)))) => {
101120
// check that future hasn't been canceled already
@@ -157,7 +176,16 @@ where
157176
continue;
158177
},
159178

160-
Ok(Async::NotReady) => return Ok(Async::NotReady),
179+
Ok(Async::NotReady) => {
180+
match cancel_rx.poll() {
181+
Ok(Async::Ready(never)) => match never {},
182+
Ok(Async::NotReady) => return Ok(Async::NotReady),
183+
Err(_conn_is_eof) => {
184+
trace!("connection task is closed, closing dispatch task");
185+
return Ok(Async::Ready(Dispatched::Shutdown));
186+
}
187+
}
188+
},
161189

162190
Ok(Async::Ready(None)) => {
163191
trace!("client::dispatch::Sender dropped");

tests/client.rs

+51
Original file line numberDiff line numberDiff line change
@@ -2095,6 +2095,57 @@ mod conn {
20952095
assert_eq!(vec, b"bar=foo");
20962096
}
20972097

2098+
2099+
#[test]
2100+
fn http2_detect_conn_eof() {
2101+
use futures::future;
2102+
use hyper::{Response, Server};
2103+
use hyper::service::service_fn_ok;
2104+
use tokio::timer::Delay;
2105+
2106+
let _ = pretty_env_logger::try_init();
2107+
2108+
let mut rt = Runtime::new().unwrap();
2109+
2110+
let server = Server::bind(&([127, 0, 0, 1], 0).into())
2111+
.http2_only(true)
2112+
.serve(|| service_fn_ok(|_req| {
2113+
Response::new(Body::empty())
2114+
}));
2115+
let addr = server.local_addr();
2116+
let (shdn_tx, shdn_rx) = oneshot::channel();
2117+
rt.spawn(server.with_graceful_shutdown(shdn_rx).map_err(|e| panic!("server error: {:?}", e)));
2118+
2119+
let io = rt.block_on(tcp_connect(&addr)).expect("tcp connect");
2120+
let (mut client, conn) = rt.block_on(
2121+
conn::Builder::new().http2_only(true).handshake::<_, Body>(io)
2122+
).expect("http handshake");
2123+
rt.spawn(conn.map_err(|e| panic!("client conn error: {:?}", e)));
2124+
2125+
2126+
// Sanity check that client is ready
2127+
rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready sanity");
2128+
2129+
let req = Request::builder()
2130+
.uri(format!("http://{}/", addr))
2131+
.body(Body::empty())
2132+
.expect("request builder");
2133+
2134+
rt.block_on(client.send_request(req)).expect("req1 send");
2135+
2136+
// Sanity check that client is STILL ready
2137+
rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready after");
2138+
2139+
// Trigger the server shutdown...
2140+
let _ = shdn_tx.send(());
2141+
2142+
// Allow time for graceful shutdown roundtrips...
2143+
rt.block_on(Delay::new(::std::time::Instant::now() + Duration::from_millis(100)).map_err(|e| panic!("delay error: {:?}", e))).expect("delay");
2144+
2145+
// After graceful shutdown roundtrips, the client should be closed...
2146+
rt.block_on(future::poll_fn(|| client.poll_ready())).expect_err("client should be closed");
2147+
}
2148+
20982149
struct DebugStream {
20992150
tcp: TcpStream,
21002151
shutdown_called: bool,

0 commit comments

Comments
 (0)