Skip to content

Commit 42c5efc

Browse files
committed
fix(http2): send a GOAWAY when the user's Service::poll_ready errors
The `Error::source()` is searched for an `h2::Error` to allow sending different error codes in the GOAWAY. If none is found, it defaults to `INTERNAL_ERROR`.
1 parent c7a046c commit 42c5efc

File tree

3 files changed

+110
-34
lines changed

3 files changed

+110
-34
lines changed

src/proto/h2/server.rs

+53-31
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ where
3838
B: Payload,
3939
{
4040
conn: Connection<T, SendBuf<B::Data>>,
41+
closing: Option<::Error>,
4142
}
4243

4344

@@ -46,7 +47,6 @@ where
4647
T: AsyncRead + AsyncWrite,
4748
S: Service<ReqBody=Body, ResBody=B>,
4849
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
49-
//S::Future: Send + 'static,
5050
B: Payload,
5151
E: H2Exec<S::Future, B>,
5252
{
@@ -66,7 +66,9 @@ where
6666
// fall-through, to replace state with Closed
6767
},
6868
State::Serving(ref mut srv) => {
69-
srv.conn.graceful_shutdown();
69+
if srv.closing.is_none() {
70+
srv.conn.graceful_shutdown();
71+
}
7072
return;
7173
},
7274
State::Closed => {
@@ -82,7 +84,6 @@ where
8284
T: AsyncRead + AsyncWrite,
8385
S: Service<ReqBody=Body, ResBody=B>,
8486
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
85-
//S::Future: Send + 'static,
8687
B: Payload,
8788
E: H2Exec<S::Future, B>,
8889
{
@@ -95,7 +96,8 @@ where
9596
State::Handshaking(ref mut h) => {
9697
let conn = try_ready!(h.poll().map_err(::Error::new_h2));
9798
State::Serving(Serving {
98-
conn: conn,
99+
conn,
100+
closing: None,
99101
})
100102
},
101103
State::Serving(ref mut srv) => {
@@ -127,37 +129,57 @@ where
127129
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
128130
E: H2Exec<S::Future, B>,
129131
{
130-
loop {
131-
// At first, polls the readiness of supplied service.
132-
match service.poll_ready() {
133-
Ok(Async::Ready(())) => (),
134-
Ok(Async::NotReady) => {
135-
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
136-
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
137-
trace!("incoming connection complete");
138-
return Ok(Async::Ready(()));
139-
}
140-
Err(err) => {
141-
trace!("service closed");
142-
return Err(::Error::new_user_service(err));
132+
if self.closing.is_none() {
133+
loop {
134+
// At first, polls the readiness of supplied service.
135+
match service.poll_ready() {
136+
Ok(Async::Ready(())) => (),
137+
Ok(Async::NotReady) => {
138+
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
139+
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
140+
trace!("incoming connection complete");
141+
return Ok(Async::Ready(()));
142+
}
143+
Err(err) => {
144+
let err = ::Error::new_user_service(err);
145+
debug!("service closed: {}", err);
146+
147+
let reason = err.h2_reason();
148+
if reason == h2::Reason::NO_ERROR {
149+
// NO_ERROR is only used for graceful shutdowns...
150+
trace!("interpretting NO_ERROR user error as graceful_shutdown");
151+
self.conn.graceful_shutdown();
152+
} else {
153+
trace!("abruptly shutting down with {:?}", reason);
154+
self.conn.abrupt_shutdown(reason);
155+
}
156+
self.closing = Some(err);
157+
break;
158+
}
143159
}
144-
}
145160

146-
// When the service is ready, accepts an incoming request.
147-
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
148-
trace!("incoming request");
149-
let content_length = content_length_parse_all(req.headers());
150-
let req = req.map(|stream| {
151-
::Body::h2(stream, content_length)
152-
});
153-
let fut = H2Stream::new(service.call(req), respond);
154-
exec.execute_h2stream(fut)?;
155-
} else {
156-
// no more incoming streams...
157-
trace!("incoming connection complete");
158-
return Ok(Async::Ready(()))
161+
// When the service is ready, accepts an incoming request.
162+
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
163+
trace!("incoming request");
164+
let content_length = content_length_parse_all(req.headers());
165+
let req = req.map(|stream| {
166+
::Body::h2(stream, content_length)
167+
});
168+
let fut = H2Stream::new(service.call(req), respond);
169+
exec.execute_h2stream(fut)?;
170+
} else {
171+
// no more incoming streams...
172+
trace!("incoming connection complete");
173+
return Ok(Async::Ready(()))
174+
}
159175
}
160176
}
177+
178+
debug_assert!(self.closing.is_some(), "poll_server broke loop without closing");
179+
180+
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
181+
182+
Err(self.closing.take().expect("polled after error"))
161183
}
162184
}
163185

src/server/conn.rs

-3
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,6 @@ where
555555
match polled {
556556
Ok(x) => return Ok(x),
557557
Err(e) => {
558-
debug!("error polling connection protocol without shutdown: {}", e);
559558
match *e.kind() {
560559
Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
561560
self.upgrade_h2();
@@ -644,7 +643,6 @@ where
644643
}
645644
})),
646645
Err(e) => {
647-
debug!("error polling connection protocol: {}", e);
648646
match *e.kind() {
649647
Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
650648
self.upgrade_h2();
@@ -962,7 +960,6 @@ mod upgrades {
962960
return Ok(Async::Ready(()));
963961
},
964962
Err(e) => {
965-
debug!("error polling connection protocol: {}", e);
966963
match *e.kind() {
967964
Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
968965
self.inner.upgrade_h2();

tests/server.rs

+57
Original file line numberDiff line numberDiff line change
@@ -1630,6 +1630,63 @@ fn http2_body_user_error_sends_reset_reason() {
16301630
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
16311631
}
16321632

1633+
#[test]
1634+
fn http2_service_poll_ready_error_sends_goaway() {
1635+
use std::error::Error;
1636+
1637+
struct Svc;
1638+
1639+
impl hyper::service::Service for Svc {
1640+
type ReqBody = hyper::Body;
1641+
type ResBody = hyper::Body;
1642+
type Error = h2::Error;
1643+
type Future = Box<dyn Future<
1644+
Item = hyper::Response<Self::ResBody>,
1645+
Error = Self::Error
1646+
> + Send + Sync>;
1647+
1648+
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
1649+
Err(h2::Error::from(h2::Reason::INADEQUATE_SECURITY))
1650+
}
1651+
1652+
fn call(&mut self, _: hyper::Request<Self::ResBody>) -> Self::Future {
1653+
unreachable!("poll_ready error should have shutdown conn");
1654+
}
1655+
}
1656+
1657+
let _ = pretty_env_logger::try_init();
1658+
1659+
let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into())
1660+
.http2_only(true)
1661+
.serve(|| Ok::<_, BoxError>(Svc));
1662+
1663+
let addr_str = format!("http://{}", server.local_addr());
1664+
1665+
1666+
let mut rt = Runtime::new().expect("runtime new");
1667+
1668+
rt.spawn(server.map_err(|e| unreachable!("server shouldn't error: {:?}", e)));
1669+
1670+
let err = rt.block_on(hyper::rt::lazy(move || {
1671+
let client = Client::builder()
1672+
.http2_only(true)
1673+
.build_http::<hyper::Body>();
1674+
let uri = addr_str.parse().expect("server addr should parse");
1675+
1676+
client
1677+
.get(uri)
1678+
})).unwrap_err();
1679+
1680+
// client request should have gotten the specific GOAWAY error...
1681+
let h2_err = err
1682+
.source()
1683+
.expect("source")
1684+
.downcast_ref::<h2::Error>()
1685+
.expect("downcast");
1686+
1687+
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
1688+
}
1689+
16331690
// -------------------------------------------------
16341691
// the Server that is used to run all the tests with
16351692
// -------------------------------------------------

0 commit comments

Comments
 (0)