Skip to content

Commit 6530a00

Browse files
committed
fix(http1): reduce closed connections when body is dropped
If a user makes use of `Body::is_end_stream` to optimize so as to not need to do make a final poll just to receive `None`, previously the connection would not have progressed its reading state to a finished body, and so the connection would be closed. Now, upon reading any chunk, the connection state will check if it can know that the body would be finished, and progresses to a body finished state sooner. The integration tests were amplified by adding a naive hyper proxy as a secondary test, which happens to make use of that optimization, and thus caught the issue.
1 parent 05c1179 commit 6530a00

File tree

4 files changed

+163
-37
lines changed

4 files changed

+163
-37
lines changed

src/client/tests.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,14 @@ fn checkout_win_allows_connect_future_to_be_pooled() {
137137
.map(|res| res.into_body().concat2());
138138
let srv1 = poll_fn(|| {
139139
try_ready!(sock1.read(&mut [0u8; 512]));
140-
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx"));
140+
// Chunked is used so as to force 2 body reads.
141+
try_ready!(sock1.write(b"\
142+
HTTP/1.1 200 OK\r\n\
143+
transfer-encoding: chunked\r\n\
144+
\r\n\
145+
1\r\nx\r\n\
146+
0\r\n\r\n\
147+
"));
141148
Ok(Async::Ready(()))
142149
}).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e));
143150

src/proto/h1/conn.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -180,29 +180,31 @@ where I: AsyncRead + AsyncWrite,
180180
pub fn read_body(&mut self) -> Poll<Option<Chunk>, io::Error> {
181181
debug_assert!(self.can_read_body());
182182

183-
trace!("Conn::read_body");
184-
185183
let (reading, ret) = match self.state.reading {
186184
Reading::Body(ref mut decoder) => {
187185
match decoder.decode(&mut self.io) {
188186
Ok(Async::Ready(slice)) => {
189-
let (reading, chunk) = if !slice.is_empty() {
190-
return Ok(Async::Ready(Some(Chunk::from(slice))));
191-
} else if decoder.is_eof() {
187+
let (reading, chunk) = if decoder.is_eof() {
192188
debug!("incoming body completed");
193-
(Reading::KeepAlive, None)
194-
} else {
195-
trace!("decode stream unexpectedly ended");
196-
// this should actually be unreachable:
197-
// the decoder will return an UnexpectedEof if there were
198-
// no bytes to read and it isn't eof yet...
189+
(Reading::KeepAlive, if !slice.is_empty() {
190+
Some(Chunk::from(slice))
191+
} else {
192+
None
193+
})
194+
} else if slice.is_empty() {
195+
error!("decode stream unexpectedly ended");
196+
// This should be unreachable, since all 3 decoders
197+
// either set eof=true or return an Err when reading
198+
// an empty slice...
199199
(Reading::Closed, None)
200+
} else {
201+
return Ok(Async::Ready(Some(Chunk::from(slice))));
200202
};
201203
(reading, Ok(Async::Ready(chunk)))
202204
},
203205
Ok(Async::NotReady) => return Ok(Async::NotReady),
204206
Err(e) => {
205-
trace!("decode stream error: {}", e);
207+
debug!("decode stream error: {}", e);
206208
(Reading::Closed, Err(e))
207209
},
208210
}

tests/integration.rs

+44
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,50 @@ t! {
6262
;
6363
}
6464

65+
t! {
66+
get_body_2_keeps_alive,
67+
client:
68+
request:
69+
uri: "/",
70+
;
71+
response:
72+
status: 200,
73+
headers: {
74+
"content-length" => 11,
75+
},
76+
body: "hello world",
77+
;
78+
request:
79+
uri: "/",
80+
;
81+
response:
82+
status: 200,
83+
headers: {
84+
"content-length" => 11,
85+
},
86+
body: "hello world",
87+
;
88+
server:
89+
request:
90+
uri: "/",
91+
;
92+
response:
93+
headers: {
94+
"content-length" => 11,
95+
},
96+
body: "hello world",
97+
;
98+
request:
99+
uri: "/",
100+
;
101+
response:
102+
headers: {
103+
"content-length" => 11,
104+
},
105+
body: "hello world",
106+
;
107+
}
108+
65109
t! {
66110
get_strip_connection_header,
67111
client:

tests/support/mod.rs

+97-24
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ pub extern crate futures;
22
pub extern crate hyper;
33
pub extern crate tokio;
44

5+
use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}};
6+
use std::time::Duration;
7+
8+
use hyper::{Body, Client, Request, Response, Server, Version};
9+
use hyper::client::HttpConnector;
10+
use hyper::service::service_fn;
11+
512
pub use std::net::SocketAddr;
613
pub use self::futures::{future, Future, Stream};
714
pub use self::futures::sync::oneshot;
@@ -44,15 +51,25 @@ macro_rules! t {
4451
));
4552
}
4653

54+
__run_test(__TestConfig {
55+
client_version: 2,
56+
client_msgs: c.clone(),
57+
server_version: 2,
58+
server_msgs: s.clone(),
59+
parallel: true,
60+
connections: 1,
61+
proxy: false,
62+
});
63+
4764
__run_test(__TestConfig {
4865
client_version: 2,
4966
client_msgs: c,
5067
server_version: 2,
5168
server_msgs: s,
5269
parallel: true,
5370
connections: 1,
71+
proxy: true,
5472
});
55-
5673
}
5774
);
5875
(
@@ -104,6 +121,27 @@ macro_rules! t {
104121
server_msgs: s.clone(),
105122
parallel: false,
106123
connections: 1,
124+
proxy: false,
125+
});
126+
127+
__run_test(__TestConfig {
128+
client_version: 2,
129+
client_msgs: c.clone(),
130+
server_version: 2,
131+
server_msgs: s.clone(),
132+
parallel: false,
133+
connections: 1,
134+
proxy: false,
135+
});
136+
137+
__run_test(__TestConfig {
138+
client_version: 1,
139+
client_msgs: c.clone(),
140+
server_version: 1,
141+
server_msgs: s.clone(),
142+
parallel: false,
143+
connections: 1,
144+
proxy: true,
107145
});
108146

109147
__run_test(__TestConfig {
@@ -113,6 +151,7 @@ macro_rules! t {
113151
server_msgs: s,
114152
parallel: false,
115153
connections: 1,
154+
proxy: true,
116155
});
117156
}
118157
);
@@ -185,14 +224,11 @@ pub struct __TestConfig {
185224

186225
pub parallel: bool,
187226
pub connections: usize,
227+
pub proxy: bool,
188228
}
189229

190230
pub fn __run_test(cfg: __TestConfig) {
191231
extern crate pretty_env_logger;
192-
use hyper::{Body, Client, Request, Response, Version};
193-
use hyper::client::HttpConnector;
194-
use std::sync::{Arc, Mutex};
195-
use std::time::Duration;
196232
let _ = pretty_env_logger::try_init();
197233
let mut rt = Runtime::new().expect("new rt");
198234

@@ -254,31 +290,39 @@ pub fn __run_test(cfg: __TestConfig) {
254290
)
255291
.expect("serve_addr");
256292

257-
let addr = serve.incoming_ref().local_addr();
258-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
259-
let (success_tx, success_rx) = oneshot::channel();
293+
let mut addr = serve.incoming_ref().local_addr();
260294
let expected_connections = cfg.connections;
261295
let server = serve
262296
.fold(0, move |cnt, connecting| {
297+
let cnt = cnt + 1;
298+
assert!(
299+
cnt <= expected_connections,
300+
"server expected {} connections, received {}",
301+
expected_connections,
302+
cnt
303+
);
263304
let fut = connecting
264305
.map_err(|never| -> hyper::Error { match never {} })
265306
.flatten()
266307
.map_err(|e| panic!("server connection error: {}", e));
267308
::tokio::spawn(fut);
268-
Ok::<_, hyper::Error>(cnt + 1)
269-
})
270-
.map(move |cnt| {
271-
assert_eq!(cnt, expected_connections);
272-
})
273-
.map_err(|e| panic!("serve error: {}", e))
274-
.select2(shutdown_rx)
275-
.map(move |_| {
276-
let _ = success_tx.send(());
309+
Ok::<_, hyper::Error>(cnt)
277310
})
278-
.map_err(|_| panic!("shutdown not ok"));
311+
.map(|_| ())
312+
.map_err(|e| panic!("serve error: {}", e));
279313

280314
rt.spawn(server);
281315

316+
if cfg.proxy {
317+
let (proxy_addr, proxy) = naive_proxy(ProxyConfig {
318+
connections: cfg.connections,
319+
dst: addr,
320+
version: cfg.server_version,
321+
});
322+
rt.spawn(proxy);
323+
addr = proxy_addr;
324+
}
325+
282326

283327
let make_request = Arc::new(move |client: &Client<HttpConnector>, creq: __CReq, cres: __CRes| {
284328
let uri = format!("http://{}{}", addr, creq.uri);
@@ -335,12 +379,41 @@ pub fn __run_test(cfg: __TestConfig) {
335379
Box::new(client_futures.map(|_| ()))
336380
};
337381

338-
let client_futures = client_futures.map(move |_| {
339-
let _ = shutdown_tx.send(());
340-
});
341-
rt.spawn(client_futures);
342-
rt.block_on(success_rx
343-
.map_err(|_| "something panicked"))
382+
let client_futures = client_futures.map(|_| ());
383+
rt.block_on(client_futures)
344384
.expect("shutdown succeeded");
345385
}
346386

387+
struct ProxyConfig {
388+
connections: usize,
389+
dst: SocketAddr,
390+
version: usize,
391+
}
392+
393+
fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Item = (), Error = ()>) {
394+
let client = Client::builder()
395+
.keep_alive_timeout(Duration::from_secs(10))
396+
.http2_only(cfg.version == 2)
397+
.build_http::<Body>();
398+
399+
let dst_addr = cfg.dst;
400+
let max_connections = cfg.connections;
401+
let counter = AtomicUsize::new(0);
402+
403+
let srv = Server::bind(&([127, 0, 0, 1], 0).into())
404+
.serve(move || {
405+
let prev = counter.fetch_add(1, Ordering::Relaxed);
406+
assert!(max_connections >= prev + 1, "proxy max connections");
407+
let client = client.clone();
408+
service_fn(move |mut req| {
409+
let uri = format!("http://{}{}", dst_addr, req.uri().path())
410+
.parse()
411+
.expect("proxy new uri parse");
412+
*req.uri_mut() = uri;
413+
client.request(req)
414+
})
415+
416+
});
417+
let proxy_addr = srv.local_addr();
418+
(proxy_addr, srv.map_err(|err| panic!("proxy error: {}", err)))
419+
}

0 commit comments

Comments
 (0)