Skip to content

Commit 4d33ad4

Browse files
authored
Merge branch 'master' into gh-2169
2 parents 465d404 + 77c3b5b commit 4d33ad4

23 files changed

+365
-103
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
### v0.13.6 (2020-05-29)
2+
3+
4+
#### Features
5+
6+
* **body:** remove Sync bound for Body::wrap_stream ([042c7706](https://github.com/hyperium/hyper/commit/042c770603a212f22387807efe4fc672959df40c))
7+
* **http2:** allow configuring the HTTP/2 frame size ([b6446456](https://github.com/hyperium/hyper/commit/b64464562a02a642a3cf16ea072f39621da21980))
8+
9+
110
### v0.13.5 (2020-04-17)
211

312

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hyper"
3-
version = "0.13.5" # don't forget to update html_root_url
3+
version = "0.13.6" # don't forget to update html_root_url
44
description = "A fast and correct HTTP library."
55
readme = "README.md"
66
homepage = "https://hyper.rs"
@@ -30,15 +30,15 @@ httparse = "1.0"
3030
h2 = "0.2.2"
3131
itoa = "0.4.1"
3232
log = "0.4"
33-
pin-project = "0.4"
33+
pin-project = "0.4.20"
3434
time = "0.1"
3535
tower-service = "0.3"
3636
tokio = { version = "0.2.5", features = ["sync"] }
3737
want = "0.3"
3838

3939
# Optional
4040

41-
net2 = { version = "0.2.32", optional = true }
41+
socket2 = { version = "0.3", optional = true }
4242

4343
[dev-dependencies]
4444
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
@@ -68,7 +68,7 @@ runtime = [
6868
"tokio/rt-core",
6969
]
7070
tcp = [
71-
"net2",
71+
"socket2",
7272
"tokio/blocking",
7373
"tokio/tcp",
7474
"tokio/time",

examples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pretty_env_logger = "0.4"
4141

4242
* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files asynchronously.
4343

44+
* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a shared counter across requests.
45+
4446
* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter).
4547

4648
* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count.

examples/service_struct_impl.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use hyper::service::Service;
2+
use hyper::{Body, Request, Response, Server};
3+
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
type Counter = i32;
9+
10+
#[tokio::main]
11+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12+
let addr = ([127, 0, 0, 1], 3000).into();
13+
14+
let server = Server::bind(&addr).serve(MakeSvc { counter: 81818 });
15+
println!("Listening on http://{}", addr);
16+
17+
server.await?;
18+
Ok(())
19+
}
20+
21+
struct Svc {
22+
counter: Counter,
23+
}
24+
25+
impl Service<Request<Body>> for Svc {
26+
type Response = Response<Body>;
27+
type Error = hyper::Error;
28+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
29+
30+
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
31+
Poll::Ready(Ok(()))
32+
}
33+
34+
fn call(&mut self, req: Request<Body>) -> Self::Future {
35+
fn mk_response(s: String) -> Result<Response<Body>, hyper::Error> {
36+
Ok(Response::builder().body(Body::from(s)).unwrap())
37+
}
38+
39+
let res = match req.uri().path() {
40+
"/" => mk_response(format!("home! counter = {:?}", self.counter)),
41+
"/posts" => mk_response(format!("posts, of course! counter = {:?}", self.counter)),
42+
"/authors" => mk_response(format!(
43+
"authors extraordinare! counter = {:?}",
44+
self.counter
45+
)),
46+
// Return the 404 Not Found for other routes, and don't increment counter.
47+
_ => return Box::pin(async { mk_response("oh no! not found".into()) }),
48+
};
49+
50+
if req.uri().path() != "/favicon.ico" {
51+
self.counter += 1;
52+
}
53+
54+
Box::pin(async { res })
55+
}
56+
}
57+
58+
struct MakeSvc {
59+
counter: Counter,
60+
}
61+
62+
impl<T> Service<T> for MakeSvc {
63+
type Response = Svc;
64+
type Error = hyper::Error;
65+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
66+
67+
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
68+
Poll::Ready(Ok(()))
69+
}
70+
71+
fn call(&mut self, _: T) -> Self::Future {
72+
let counter = self.counter.clone();
73+
let fut = async move { Ok(Svc { counter }) };
74+
Box::pin(fut)
75+
}
76+
}

src/body/body.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
1111
use http::HeaderMap;
1212
use http_body::{Body as HttpBody, SizeHint};
1313

14+
use crate::common::sync_wrapper::SyncWrapper;
1415
use crate::common::{task, watch, Future, Never, Pin, Poll};
1516
use crate::proto::h2::ping;
1617
use crate::proto::DecodedLength;
@@ -42,13 +43,11 @@ enum Kind {
4243
content_length: DecodedLength,
4344
recv: h2::RecvStream,
4445
},
45-
// NOTE: This requires `Sync` because of how easy it is to use `await`
46-
// while a borrow of a `Request<Body>` exists.
47-
//
48-
// See https://github.com/rust-lang/rust/issues/57017
4946
#[cfg(feature = "stream")]
5047
Wrapped(
51-
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
48+
SyncWrapper<
49+
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
50+
>,
5251
),
5352
}
5453

@@ -156,12 +155,12 @@ impl Body {
156155
#[cfg(feature = "stream")]
157156
pub fn wrap_stream<S, O, E>(stream: S) -> Body
158157
where
159-
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
158+
S: Stream<Item = Result<O, E>> + Send + 'static,
160159
O: Into<Bytes> + 'static,
161160
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
162161
{
163162
let mapped = stream.map_ok(Into::into).map_err(Into::into);
164-
Body::new(Kind::Wrapped(Box::pin(mapped)))
163+
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
165164
}
166165

167166
/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
@@ -280,7 +279,7 @@ impl Body {
280279
},
281280

282281
#[cfg(feature = "stream")]
283-
Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) {
282+
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
284283
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
285284
None => Poll::Ready(None),
286285
},
@@ -297,7 +296,7 @@ impl Body {
297296
}
298297

299298
impl Default for Body {
300-
/// Returns [`Body::empty()`](Body::empty).
299+
/// Returns `Body::empty()`.
301300
#[inline]
302301
fn default() -> Body {
303302
Body::empty()
@@ -402,16 +401,12 @@ impl Stream for Body {
402401
/// This function requires enabling the `stream` feature in your
403402
/// `Cargo.toml`.
404403
#[cfg(feature = "stream")]
405-
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
406-
for Body
407-
{
404+
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
408405
#[inline]
409406
fn from(
410-
stream: Box<
411-
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
412-
>,
407+
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
413408
) -> Body {
414-
Body::new(Kind::Wrapped(stream.into()))
409+
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
415410
}
416411
}
417412

src/client/conn.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::time::Duration;
1717

1818
use bytes::Bytes;
1919
use futures_util::future::{self, Either, FutureExt as _};
20-
use pin_project::{pin_project, project};
20+
use pin_project::pin_project;
2121
use tokio::io::{AsyncRead, AsyncWrite};
2222
use tower_service::Service;
2323

@@ -30,7 +30,7 @@ use crate::{Body, Request, Response};
3030

3131
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
3232

33-
#[pin_project]
33+
#[pin_project(project = ProtoClientProj)]
3434
enum ProtoClient<T, B>
3535
where
3636
B: HttpBody,
@@ -525,6 +525,18 @@ impl Builder {
525525
self
526526
}
527527

528+
/// Sets the maximum frame size to use for HTTP2.
529+
///
530+
/// Passing `None` will do nothing.
531+
///
532+
/// If not set, hyper will use a default.
533+
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
534+
if let Some(sz) = sz.into() {
535+
self.h2_builder.max_frame_size = sz;
536+
}
537+
self
538+
}
539+
528540
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
529541
/// connection alive.
530542
///
@@ -665,12 +677,10 @@ where
665677
{
666678
type Output = crate::Result<proto::Dispatched>;
667679

668-
#[project]
669680
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
670-
#[project]
671681
match self.project() {
672-
ProtoClient::H1(c) => c.poll(cx),
673-
ProtoClient::H2(c) => c.poll(cx),
682+
ProtoClientProj::H1(c) => c.poll(cx),
683+
ProtoClientProj::H2(c) => c.poll(cx),
674684
}
675685
}
676686
}

src/client/connect/dns.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,13 @@ impl Future for GaiFuture {
141141
Pin::new(&mut self.inner).poll(cx).map(|res| match res {
142142
Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }),
143143
Ok(Err(err)) => Err(err),
144-
Err(join_err) => panic!("gai background task failed: {:?}", join_err),
144+
Err(join_err) => {
145+
if join_err.is_cancelled() {
146+
Err(io::Error::new(io::ErrorKind::Interrupted, join_err))
147+
} else {
148+
panic!("gai background task failed: {:?}", join_err)
149+
}
150+
}
145151
})
146152
}
147153
}

src/client/connect/http.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::time::Duration;
1111

1212
use futures_util::future::Either;
1313
use http::uri::{Scheme, Uri};
14-
use net2::TcpBuilder;
1514
use pin_project::pin_project;
1615
use tokio::net::TcpStream;
1716
use tokio::time::Delay;
@@ -602,20 +601,22 @@ fn connect(
602601
reuse_address: bool,
603602
connect_timeout: Option<Duration>,
604603
) -> io::Result<impl Future<Output = io::Result<TcpStream>>> {
605-
let builder = match *addr {
606-
SocketAddr::V4(_) => TcpBuilder::new_v4()?,
607-
SocketAddr::V6(_) => TcpBuilder::new_v6()?,
604+
use socket2::{Domain, Protocol, Socket, Type};
605+
let domain = match *addr {
606+
SocketAddr::V4(_) => Domain::ipv4(),
607+
SocketAddr::V6(_) => Domain::ipv6(),
608608
};
609+
let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
609610

610611
if reuse_address {
611-
builder.reuse_address(reuse_address)?;
612+
socket.set_reuse_address(true)?;
612613
}
613614

614-
bind_local_address(&builder, addr, local_addr_ipv4, local_addr_ipv6)?;
615+
bind_local_address(&socket, addr, local_addr_ipv4, local_addr_ipv6)?;
615616

616617
let addr = *addr;
617618

618-
let std_tcp = builder.to_tcp_stream()?;
619+
let std_tcp = socket.into_tcp_stream();
619620

620621
Ok(async move {
621622
let connect = TcpStream::connect_std(std_tcp, &addr);

src/client/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,26 @@ where
562562
}
563563
}
564564

565+
impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
566+
where
567+
C: Connect + Clone + Send + Sync + 'static,
568+
B: HttpBody + Send + 'static,
569+
B::Data: Send,
570+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
571+
{
572+
type Response = Response<Body>;
573+
type Error = crate::Error;
574+
type Future = ResponseFuture;
575+
576+
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
577+
Poll::Ready(Ok(()))
578+
}
579+
580+
fn call(&mut self, req: Request<B>) -> Self::Future {
581+
self.request(req)
582+
}
583+
}
584+
565585
impl<C: Clone, B> Clone for Client<C, B> {
566586
fn clone(&self) -> Client<C, B> {
567587
Client {
@@ -1040,6 +1060,16 @@ impl Builder {
10401060
self
10411061
}
10421062

1063+
/// Sets the maximum frame size to use for HTTP2.
1064+
///
1065+
/// Passing `None` will do nothing.
1066+
///
1067+
/// If not set, hyper will use a default.
1068+
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1069+
self.conn_builder.http2_max_frame_size(sz);
1070+
self
1071+
}
1072+
10431073
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
10441074
/// connection alive.
10451075
///

src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub(crate) mod exec;
1313
pub(crate) mod io;
1414
mod lazy;
1515
mod never;
16+
pub(crate) mod sync_wrapper;
1617
pub(crate) mod task;
1718
pub(crate) mod watch;
1819

0 commit comments

Comments
 (0)