Skip to content

Commit a0a0fcd

Browse files
lnicolaseanmonstar
authored andcommitted
feat(body): make Body know about incoming Content-Length
When getting a `Body` from hyper, such as in a client response, the method `Body::content_length()` now returns a value if the header was present. Closes #1545
1 parent 396fe80 commit a0a0fcd

File tree

5 files changed

+108
-20
lines changed

5 files changed

+108
-20
lines changed

src/body/body.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct Body {
3535
enum Kind {
3636
Once(Option<Chunk>),
3737
Chan {
38+
content_length: Option<u64>,
3839
abort_rx: oneshot::Receiver<()>,
3940
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
4041
},
@@ -85,6 +86,11 @@ impl Body {
8586
/// Useful when wanting to stream chunks from another thread.
8687
#[inline]
8788
pub fn channel() -> (Sender, Body) {
89+
Self::new_channel(None)
90+
}
91+
92+
#[inline]
93+
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
8894
let (tx, rx) = mpsc::channel(0);
8995
let (abort_tx, abort_rx) = oneshot::channel();
9096

@@ -93,8 +99,9 @@ impl Body {
9399
tx: tx,
94100
};
95101
let rx = Body::new(Kind::Chan {
96-
abort_rx: abort_rx,
97-
rx: rx,
102+
content_length,
103+
abort_rx,
104+
rx,
98105
});
99106

100107
(tx, rx)
@@ -188,13 +195,19 @@ impl Body {
188195
fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
189196
match self.kind {
190197
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
191-
Kind::Chan { ref mut rx, ref mut abort_rx } => {
198+
Kind::Chan { content_length: ref mut len, ref mut rx, ref mut abort_rx } => {
192199
if let Ok(Async::Ready(())) = abort_rx.poll() {
193200
return Err(::Error::new_body_write("body write aborted"));
194201
}
195202

196203
match rx.poll().expect("mpsc cannot error") {
197-
Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
204+
Async::Ready(Some(Ok(chunk))) => {
205+
if let Some(ref mut len) = *len {
206+
debug_assert!(*len >= chunk.len() as u64);
207+
*len = *len - chunk.len() as u64;
208+
}
209+
Ok(Async::Ready(Some(chunk)))
210+
}
198211
Async::Ready(Some(Err(err))) => Err(err),
199212
Async::Ready(None) => Ok(Async::Ready(None)),
200213
Async::NotReady => Ok(Async::NotReady),
@@ -243,7 +256,7 @@ impl Payload for Body {
243256
fn is_end_stream(&self) -> bool {
244257
match self.kind {
245258
Kind::Once(ref val) => val.is_none(),
246-
Kind::Chan { .. } => false,
259+
Kind::Chan { content_length: len, .. } => len == Some(0),
247260
Kind::H2(ref h2) => h2.is_end_stream(),
248261
Kind::Wrapped(..) => false,
249262
}
@@ -253,7 +266,7 @@ impl Payload for Body {
253266
match self.kind {
254267
Kind::Once(Some(ref val)) => Some(val.len() as u64),
255268
Kind::Once(None) => Some(0),
256-
Kind::Chan { .. } => None,
269+
Kind::Chan { content_length: len, .. } => len,
257270
Kind::H2(..) => None,
258271
Kind::Wrapped(..) => None,
259272
}

src/proto/h1/conn.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ where I: AsyncRead + AsyncWrite,
114114
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
115115
}
116116

117-
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, bool)>, ::Error> {
117+
pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, Option<BodyLength>)>, ::Error> {
118118
debug_assert!(self.can_read_head());
119119
trace!("Conn::read_head");
120120

@@ -162,7 +162,6 @@ where I: AsyncRead + AsyncWrite,
162162
continue;
163163
}
164164
};
165-
166165
debug!("incoming body is {}", decoder);
167166

168167
self.state.busy();
@@ -172,20 +171,23 @@ where I: AsyncRead + AsyncWrite,
172171
}
173172
let wants_keep_alive = msg.keep_alive;
174173
self.state.keep_alive &= wants_keep_alive;
175-
let (body, reading) = if decoder.is_eof() {
176-
(false, Reading::KeepAlive)
177-
} else {
178-
(true, Reading::Body(decoder))
179-
};
174+
175+
let content_length = decoder.content_length();
176+
180177
if let Reading::Closed = self.state.reading {
181178
// actually want an `if not let ...`
182179
} else {
183-
self.state.reading = reading;
180+
self.state.reading = if content_length.is_none() {
181+
Reading::KeepAlive
182+
} else {
183+
Reading::Body(decoder)
184+
};
184185
}
185-
if !body {
186+
if content_length.is_none() {
186187
self.try_keep_alive();
187188
}
188-
return Ok(Async::Ready(Some((head, body))));
189+
190+
return Ok(Async::Ready(Some((head, content_length))));
189191
}
190192
}
191193

src/proto/h1/decode.rs

+11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures::{Async, Poll};
77
use bytes::Bytes;
88

99
use super::io::MemRead;
10+
use super::BodyLength;
1011

1112
use self::Kind::{Length, Chunked, Eof};
1213

@@ -84,6 +85,16 @@ impl Decoder {
8485
}
8586
}
8687

88+
pub fn content_length(&self) -> Option<BodyLength> {
89+
match self.kind {
90+
Length(0) |
91+
Chunked(ChunkedState::End, _) |
92+
Eof(true) => None,
93+
Length(len) => Some(BodyLength::Known(len)),
94+
_ => Some(BodyLength::Unknown),
95+
}
96+
}
97+
8798
pub fn decode<R: MemRead>(&mut self, body: &mut R) -> Poll<Bytes, io::Error> {
8899
trace!("decode; state={:?}", self.kind);
89100
match self.kind {

src/proto/h1/dispatch.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,14 @@ where
190190
}
191191
// dispatch is ready for a message, try to read one
192192
match self.conn.read_head() {
193-
Ok(Async::Ready(Some((head, has_body)))) => {
194-
let body = if has_body {
195-
let (mut tx, rx) = Body::channel();
193+
Ok(Async::Ready(Some((head, body_len)))) => {
194+
let body = if let Some(body_len) = body_len {
195+
let (mut tx, rx) =
196+
Body::new_channel(if let BodyLength::Known(len) = body_len {
197+
Some(len)
198+
} else {
199+
None
200+
});
196201
let _ = tx.poll_ready(); // register this task if rx is dropped
197202
self.body_tx = Some(tx);
198203
rx
@@ -201,7 +206,7 @@ where
201206
};
202207
self.dispatch.recv_msg(Ok((head, body)))?;
203208
Ok(Async::Ready(()))
204-
},
209+
}
205210
Ok(Async::Ready(None)) => {
206211
// read eof, conn will start to shutdown automatically
207212
Ok(Async::Ready(()))

tests/client.rs

+57
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,63 @@ mod conn {
14241424
res.join(rx).map(|r| r.0).wait().unwrap();
14251425
}
14261426

1427+
#[test]
1428+
fn incoming_content_length() {
1429+
use hyper::body::Payload;
1430+
1431+
let server = TcpListener::bind("127.0.0.1:0").unwrap();
1432+
let addr = server.local_addr().unwrap();
1433+
let mut runtime = Runtime::new().unwrap();
1434+
1435+
let (tx1, rx1) = oneshot::channel();
1436+
1437+
thread::spawn(move || {
1438+
let mut sock = server.accept().unwrap().0;
1439+
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
1440+
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
1441+
let mut buf = [0; 4096];
1442+
let n = sock.read(&mut buf).expect("read 1");
1443+
1444+
let expected = "GET / HTTP/1.1\r\n\r\n";
1445+
assert_eq!(s(&buf[..n]), expected);
1446+
1447+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello").unwrap();
1448+
let _ = tx1.send(());
1449+
});
1450+
1451+
let tcp = tcp_connect(&addr).wait().unwrap();
1452+
1453+
let (mut client, conn) = conn::handshake(tcp).wait().unwrap();
1454+
1455+
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
1456+
1457+
let req = Request::builder()
1458+
.uri("/")
1459+
.body(Default::default())
1460+
.unwrap();
1461+
let res = client.send_request(req).and_then(move |mut res| {
1462+
assert_eq!(res.status(), hyper::StatusCode::OK);
1463+
assert_eq!(res.body().content_length(), Some(5));
1464+
assert!(!res.body().is_end_stream());
1465+
loop {
1466+
let chunk = res.body_mut().poll_data().unwrap();
1467+
match chunk {
1468+
Async::Ready(Some(chunk)) => {
1469+
assert_eq!(chunk.len(), 5);
1470+
break;
1471+
}
1472+
_ => continue
1473+
}
1474+
}
1475+
res.into_body().concat2()
1476+
});
1477+
let rx = rx1.expect("thread panicked");
1478+
1479+
let timeout = Delay::new(Duration::from_millis(200));
1480+
let rx = rx.and_then(move |_| timeout.expect("timeout"));
1481+
res.join(rx).map(|r| r.0).wait().unwrap();
1482+
}
1483+
14271484
#[test]
14281485
fn aborted_body_isnt_completed() {
14291486
let _ = ::pretty_env_logger::try_init();

0 commit comments

Comments
 (0)