Skip to content

Commit 5a59875

Browse files
committed
feat(body): replace Chunk type with Bytes
Closes #1931 BREAKING CHANGE: All usage of `hyper::Chunk` should be replaced with `bytes::Bytes` (or `hyper::body::Bytes`).
1 parent c56ccfb commit 5a59875

File tree

10 files changed

+59
-245
lines changed

10 files changed

+59
-245
lines changed

examples/web_api.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use futures_util::{StreamExt, TryStreamExt};
44
use hyper::client::HttpConnector;
55
use hyper::service::{make_service_fn, service_fn};
6-
use hyper::{header, Body, Chunk, Client, Method, Request, Response, Server, StatusCode};
6+
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
77

88
type GenericError = Box<dyn std::error::Error + Send + Sync>;
99
type Result<T> = std::result::Result<T, GenericError>;
@@ -25,11 +25,11 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
2525
let web_res = client.request(req).await?;
2626
// Compare the JSON we sent (before) with what we received (after):
2727
let body = Body::wrap_stream(web_res.into_body().map_ok(|b| {
28-
Chunk::from(format!(
28+
format!(
2929
"<b>POST request body</b>: {}<br><b>Response</b>: {}",
3030
POST_DATA,
3131
std::str::from_utf8(&b).unwrap()
32-
))
32+
)
3333
}));
3434

3535
Ok(Response::new(body))

src/body/body.rs

+25-33
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ use futures_util::TryStreamExt;
1111
use http::HeaderMap;
1212
use http_body::{Body as HttpBody, SizeHint};
1313

14-
use super::Chunk;
1514
use crate::common::{task, Future, Never, Pin, Poll};
1615
use crate::upgrade::OnUpgrade;
1716

18-
type BodySender = mpsc::Sender<Result<Chunk, crate::Error>>;
17+
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
1918

20-
/// A stream of `Chunk`s, used when receiving bodies.
19+
/// A stream of `Bytes`s, used when receiving bodies.
2120
///
2221
/// A good default `Payload` to use in many applications.
2322
#[must_use = "streams do nothing unless polled"]
@@ -29,11 +28,11 @@ pub struct Body {
2928
}
3029

3130
enum Kind {
32-
Once(Option<Chunk>),
31+
Once(Option<Bytes>),
3332
Chan {
3433
content_length: Option<u64>,
3534
abort_rx: oneshot::Receiver<()>,
36-
rx: mpsc::Receiver<Result<Chunk, crate::Error>>,
35+
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
3736
},
3837
H2 {
3938
content_length: Option<u64>,
@@ -45,7 +44,7 @@ enum Kind {
4544
// See https://github.com/rust-lang/rust/issues/57017
4645
#[cfg(feature = "stream")]
4746
Wrapped(
48-
Pin<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
47+
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
4948
),
5049
}
5150

@@ -152,7 +151,7 @@ impl Body {
152151
pub fn wrap_stream<S, O, E>(stream: S) -> Body
153152
where
154153
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
155-
O: Into<Chunk> + 'static,
154+
O: Into<Bytes> + 'static,
156155
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
157156
{
158157
let mapped = stream.map_ok(Into::into).map_err(Into::into);
@@ -208,7 +207,7 @@ impl Body {
208207
})
209208
}
210209

211-
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Chunk>>> {
210+
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
212211
match self.take_delayed_eof() {
213212
Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
214213
ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
@@ -237,7 +236,7 @@ impl Body {
237236
}
238237
}
239238

240-
fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Chunk>>> {
239+
fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
241240
match self.kind {
242241
Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
243242
Kind::Chan {
@@ -265,7 +264,7 @@ impl Body {
265264
} => match ready!(h2.poll_data(cx)) {
266265
Some(Ok(bytes)) => {
267266
let _ = h2.flow_control().release_capacity(bytes.len());
268-
Poll::Ready(Some(Ok(Chunk::from(bytes))))
267+
Poll::Ready(Some(Ok(bytes)))
269268
}
270269
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
271270
None => Poll::Ready(None),
@@ -279,7 +278,7 @@ impl Body {
279278
}
280279
}
281280

282-
pub(super) fn take_full_data(&mut self) -> Option<Chunk> {
281+
pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
283282
if let Kind::Once(ref mut chunk) = self.kind {
284283
chunk.take()
285284
} else {
@@ -297,7 +296,7 @@ impl Default for Body {
297296
}
298297

299298
impl HttpBody for Body {
300-
type Data = Chunk;
299+
type Data = Bytes;
301300
type Error = crate::Error;
302301

303302
fn poll_data(
@@ -362,7 +361,7 @@ impl fmt::Debug for Body {
362361
#[derive(Debug)]
363362
struct Empty;
364363
#[derive(Debug)]
365-
struct Full<'a>(&'a Chunk);
364+
struct Full<'a>(&'a Bytes);
366365

367366
let mut builder = f.debug_tuple("Body");
368367
match self.kind {
@@ -381,7 +380,7 @@ impl fmt::Debug for Body {
381380
/// `Cargo.toml`.
382381
#[cfg(feature = "stream")]
383382
impl Stream for Body {
384-
type Item = crate::Result<Chunk>;
383+
type Item = crate::Result<Bytes>;
385384

386385
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
387386
HttpBody::poll_data(self, cx)
@@ -393,22 +392,22 @@ impl Stream for Body {
393392
/// This function requires enabling the `stream` feature in your
394393
/// `Cargo.toml`.
395394
#[cfg(feature = "stream")]
396-
impl From<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
395+
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
397396
for Body
398397
{
399398
#[inline]
400399
fn from(
401400
stream: Box<
402-
dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync,
401+
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
403402
>,
404403
) -> Body {
405404
Body::new(Kind::Wrapped(stream.into()))
406405
}
407406
}
408407

409-
impl From<Chunk> for Body {
408+
impl From<Bytes> for Body {
410409
#[inline]
411-
fn from(chunk: Chunk) -> Body {
410+
fn from(chunk: Bytes) -> Body {
412411
if chunk.is_empty() {
413412
Body::empty()
414413
} else {
@@ -417,24 +416,17 @@ impl From<Chunk> for Body {
417416
}
418417
}
419418

420-
impl From<Bytes> for Body {
421-
#[inline]
422-
fn from(bytes: Bytes) -> Body {
423-
Body::from(Chunk::from(bytes))
424-
}
425-
}
426-
427419
impl From<Vec<u8>> for Body {
428420
#[inline]
429421
fn from(vec: Vec<u8>) -> Body {
430-
Body::from(Chunk::from(vec))
422+
Body::from(Bytes::from(vec))
431423
}
432424
}
433425

434426
impl From<&'static [u8]> for Body {
435427
#[inline]
436428
fn from(slice: &'static [u8]) -> Body {
437-
Body::from(Chunk::from(slice))
429+
Body::from(Bytes::from(slice))
438430
}
439431
}
440432

@@ -451,14 +443,14 @@ impl From<Cow<'static, [u8]>> for Body {
451443
impl From<String> for Body {
452444
#[inline]
453445
fn from(s: String) -> Body {
454-
Body::from(Chunk::from(s.into_bytes()))
446+
Body::from(Bytes::from(s.into_bytes()))
455447
}
456448
}
457449

458450
impl From<&'static str> for Body {
459451
#[inline]
460452
fn from(slice: &'static str) -> Body {
461-
Body::from(Chunk::from(slice.as_bytes()))
453+
Body::from(Bytes::from(slice.as_bytes()))
462454
}
463455
}
464456

@@ -486,7 +478,7 @@ impl Sender {
486478
}
487479

488480
/// Send data on this channel when it is ready.
489-
pub async fn send_data(&mut self, chunk: Chunk) -> crate::Result<()> {
481+
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
490482
futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await?;
491483
self.tx
492484
.try_send(Ok(chunk))
@@ -497,15 +489,15 @@ impl Sender {
497489
///
498490
/// # Errors
499491
///
500-
/// Returns `Err(Chunk)` if the channel could not (currently) accept
501-
/// another `Chunk`.
492+
/// Returns `Err(Bytes)` if the channel could not (currently) accept
493+
/// another `Bytes`.
502494
///
503495
/// # Note
504496
///
505497
/// This is mostly useful for when trying to send from some other thread
506498
/// that doesn't have an async context. If in an async context, prefer
507499
/// [`send_data`][] instead.
508-
pub fn try_send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> {
500+
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
509501
self.tx
510502
.try_send(Ok(chunk))
511503
.map_err(|err| err.into_inner().expect("just sent Ok"))

0 commit comments

Comments
 (0)