Skip to content

Commit 327d88a

Browse files
committed
Merge branch 'master' into gh-2169
2 parents 465d404 + 77c3b5b commit 327d88a

23 files changed

+372
-112
lines changed

CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
### v0.13.6 (2020-05-29)
2+
3+
4+
#### Features
5+
6+
* **body:** remove Sync bound for Body::wrap_stream ([042c7706](https://github.com/hyperium/hyper/commit/042c770603a212f22387807efe4fc672959df40c))
7+
* **http2:** allow configuring the HTTP/2 frame size ([b6446456](https://github.com/hyperium/hyper/commit/b64464562a02a642a3cf16ea072f39621da21980))
8+
9+
110
### v0.13.5 (2020-04-17)
211

312

Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hyper"
3-
version = "0.13.5" # don't forget to update html_root_url
3+
version = "0.13.6" # don't forget to update html_root_url
44
description = "A fast and correct HTTP library."
55
readme = "README.md"
66
homepage = "https://hyper.rs"
@@ -30,15 +30,15 @@ httparse = "1.0"
3030
h2 = "0.2.2"
3131
itoa = "0.4.1"
3232
log = "0.4"
33-
pin-project = "0.4"
33+
pin-project = "0.4.20"
3434
time = "0.1"
3535
tower-service = "0.3"
3636
tokio = { version = "0.2.5", features = ["sync"] }
3737
want = "0.3"
3838

3939
# Optional
4040

41-
net2 = { version = "0.2.32", optional = true }
41+
socket2 = { version = "0.3", optional = true }
4242

4343
[dev-dependencies]
4444
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
@@ -68,7 +68,7 @@ runtime = [
6868
"tokio/rt-core",
6969
]
7070
tcp = [
71-
"net2",
71+
"socket2",
7272
"tokio/blocking",
7373
"tokio/tcp",
7474
"tokio/time",

examples/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pretty_env_logger = "0.4"
4141

4242
* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files asynchronously.
4343

44+
* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a shared counter across requests.
45+
4446
* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter).
4547

4648
* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count.

examples/service_struct_impl.rs

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use hyper::service::Service;
2+
use hyper::{Body, Request, Response, Server};
3+
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
type Counter = i32;
9+
10+
#[tokio::main]
11+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12+
let addr = ([127, 0, 0, 1], 3000).into();
13+
14+
let server = Server::bind(&addr).serve(MakeSvc { counter: 81818 });
15+
println!("Listening on http://{}", addr);
16+
17+
server.await?;
18+
Ok(())
19+
}
20+
21+
struct Svc {
22+
counter: Counter,
23+
}
24+
25+
impl Service<Request<Body>> for Svc {
26+
type Response = Response<Body>;
27+
type Error = hyper::Error;
28+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
29+
30+
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
31+
Poll::Ready(Ok(()))
32+
}
33+
34+
fn call(&mut self, req: Request<Body>) -> Self::Future {
35+
fn mk_response(s: String) -> Result<Response<Body>, hyper::Error> {
36+
Ok(Response::builder().body(Body::from(s)).unwrap())
37+
}
38+
39+
let res = match req.uri().path() {
40+
"/" => mk_response(format!("home! counter = {:?}", self.counter)),
41+
"/posts" => mk_response(format!("posts, of course! counter = {:?}", self.counter)),
42+
"/authors" => mk_response(format!(
43+
"authors extraordinare! counter = {:?}",
44+
self.counter
45+
)),
46+
// Return the 404 Not Found for other routes, and don't increment counter.
47+
_ => return Box::pin(async { mk_response("oh no! not found".into()) }),
48+
};
49+
50+
if req.uri().path() != "/favicon.ico" {
51+
self.counter += 1;
52+
}
53+
54+
Box::pin(async { res })
55+
}
56+
}
57+
58+
struct MakeSvc {
59+
counter: Counter,
60+
}
61+
62+
impl<T> Service<T> for MakeSvc {
63+
type Response = Svc;
64+
type Error = hyper::Error;
65+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
66+
67+
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
68+
Poll::Ready(Ok(()))
69+
}
70+
71+
fn call(&mut self, _: T) -> Self::Future {
72+
let counter = self.counter.clone();
73+
let fut = async move { Ok(Svc { counter }) };
74+
Box::pin(fut)
75+
}
76+
}

src/body/body.rs

+11-16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
1111
use http::HeaderMap;
1212
use http_body::{Body as HttpBody, SizeHint};
1313

14+
use crate::common::sync_wrapper::SyncWrapper;
1415
use crate::common::{task, watch, Future, Never, Pin, Poll};
1516
use crate::proto::h2::ping;
1617
use crate::proto::DecodedLength;
@@ -42,13 +43,11 @@ enum Kind {
4243
content_length: DecodedLength,
4344
recv: h2::RecvStream,
4445
},
45-
// NOTE: This requires `Sync` because of how easy it is to use `await`
46-
// while a borrow of a `Request<Body>` exists.
47-
//
48-
// See https://github.com/rust-lang/rust/issues/57017
4946
#[cfg(feature = "stream")]
5047
Wrapped(
51-
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
48+
SyncWrapper<
49+
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
50+
>,
5251
),
5352
}
5453

@@ -156,12 +155,12 @@ impl Body {
156155
#[cfg(feature = "stream")]
157156
pub fn wrap_stream<S, O, E>(stream: S) -> Body
158157
where
159-
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
158+
S: Stream<Item = Result<O, E>> + Send + 'static,
160159
O: Into<Bytes> + 'static,
161160
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
162161
{
163162
let mapped = stream.map_ok(Into::into).map_err(Into::into);
164-
Body::new(Kind::Wrapped(Box::pin(mapped)))
163+
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
165164
}
166165

167166
/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
@@ -280,7 +279,7 @@ impl Body {
280279
},
281280

282281
#[cfg(feature = "stream")]
283-
Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) {
282+
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
284283
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
285284
None => Poll::Ready(None),
286285
},
@@ -297,7 +296,7 @@ impl Body {
297296
}
298297

299298
impl Default for Body {
300-
/// Returns [`Body::empty()`](Body::empty).
299+
/// Returns `Body::empty()`.
301300
#[inline]
302301
fn default() -> Body {
303302
Body::empty()
@@ -402,16 +401,12 @@ impl Stream for Body {
402401
/// This function requires enabling the `stream` feature in your
403402
/// `Cargo.toml`.
404403
#[cfg(feature = "stream")]
405-
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
406-
for Body
407-
{
404+
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
408405
#[inline]
409406
fn from(
410-
stream: Box<
411-
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
412-
>,
407+
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
413408
) -> Body {
414-
Body::new(Kind::Wrapped(stream.into()))
409+
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
415410
}
416411
}
417412

src/client/conn.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::time::Duration;
1717

1818
use bytes::Bytes;
1919
use futures_util::future::{self, Either, FutureExt as _};
20-
use pin_project::{pin_project, project};
20+
use pin_project::pin_project;
2121
use tokio::io::{AsyncRead, AsyncWrite};
2222
use tower_service::Service;
2323

@@ -30,7 +30,7 @@ use crate::{Body, Request, Response};
3030

3131
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
3232

33-
#[pin_project]
33+
#[pin_project(project = ProtoClientProj)]
3434
enum ProtoClient<T, B>
3535
where
3636
B: HttpBody,
@@ -525,6 +525,18 @@ impl Builder {
525525
self
526526
}
527527

528+
/// Sets the maximum frame size to use for HTTP2.
529+
///
530+
/// Passing `None` will do nothing.
531+
///
532+
/// If not set, hyper will use a default.
533+
pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
534+
if let Some(sz) = sz.into() {
535+
self.h2_builder.max_frame_size = sz;
536+
}
537+
self
538+
}
539+
528540
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
529541
/// connection alive.
530542
///
@@ -665,12 +677,10 @@ where
665677
{
666678
type Output = crate::Result<proto::Dispatched>;
667679

668-
#[project]
669680
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
670-
#[project]
671681
match self.project() {
672-
ProtoClient::H1(c) => c.poll(cx),
673-
ProtoClient::H2(c) => c.poll(cx),
682+
ProtoClientProj::H1(c) => c.poll(cx),
683+
ProtoClientProj::H2(c) => c.poll(cx),
674684
}
675685
}
676686
}

src/client/connect/dns.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,13 @@ impl Future for GaiFuture {
141141
Pin::new(&mut self.inner).poll(cx).map(|res| match res {
142142
Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }),
143143
Ok(Err(err)) => Err(err),
144-
Err(join_err) => panic!("gai background task failed: {:?}", join_err),
144+
Err(join_err) => {
145+
if join_err.is_cancelled() {
146+
Err(io::Error::new(io::ErrorKind::Interrupted, join_err))
147+
} else {
148+
panic!("gai background task failed: {:?}", join_err)
149+
}
150+
}
145151
})
146152
}
147153
}

src/client/connect/http.rs

+15-16
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::time::Duration;
1111

1212
use futures_util::future::Either;
1313
use http::uri::{Scheme, Uri};
14-
use net2::TcpBuilder;
1514
use pin_project::pin_project;
1615
use tokio::net::TcpStream;
1716
use tokio::time::Delay;
@@ -568,17 +567,17 @@ impl ConnectingTcpRemote {
568567
}
569568

570569
fn bind_local_address(
571-
builder: &TcpBuilder,
570+
socket: &socket2::Socket,
572571
dst_addr: &SocketAddr,
573572
local_addr_ipv4: &Option<Ipv4Addr>,
574573
local_addr_ipv6: &Option<Ipv6Addr>,
575574
) -> io::Result<()> {
576575
match (*dst_addr, local_addr_ipv4, local_addr_ipv6) {
577576
(SocketAddr::V4(_), Some(addr), _) => {
578-
builder.bind(SocketAddr::new(addr.clone().into(), 0))?;
577+
socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?;
579578
}
580579
(SocketAddr::V6(_), _, Some(addr)) => {
581-
builder.bind(SocketAddr::new(addr.clone().into(), 0))?;
580+
socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?;
582581
}
583582
_ => {
584583
if cfg!(windows) {
@@ -587,7 +586,7 @@ fn bind_local_address(
587586
SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
588587
SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
589588
};
590-
builder.bind(any)?;
589+
socket.bind(&any.into())?;
591590
}
592591
}
593592
}
@@ -602,20 +601,22 @@ fn connect(
602601
reuse_address: bool,
603602
connect_timeout: Option<Duration>,
604603
) -> io::Result<impl Future<Output = io::Result<TcpStream>>> {
605-
let builder = match *addr {
606-
SocketAddr::V4(_) => TcpBuilder::new_v4()?,
607-
SocketAddr::V6(_) => TcpBuilder::new_v6()?,
604+
use socket2::{Domain, Protocol, Socket, Type};
605+
let domain = match *addr {
606+
SocketAddr::V4(_) => Domain::ipv4(),
607+
SocketAddr::V6(_) => Domain::ipv6(),
608608
};
609+
let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
609610

610611
if reuse_address {
611-
builder.reuse_address(reuse_address)?;
612+
socket.set_reuse_address(true)?;
612613
}
613614

614-
bind_local_address(&builder, addr, local_addr_ipv4, local_addr_ipv6)?;
615+
bind_local_address(&socket, addr, local_addr_ipv4, local_addr_ipv6)?;
615616

616617
let addr = *addr;
617618

618-
let std_tcp = builder.to_tcp_stream()?;
619+
let std_tcp = socket.into_tcp_stream();
619620

620621
Ok(async move {
621622
let connect = TcpStream::connect_std(std_tcp, &addr);
@@ -711,21 +712,19 @@ mod tests {
711712

712713
#[cfg(any(target_os = "linux", target_os = "macos"))]
713714
fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) {
714-
use net2::TcpBuilder;
715-
use std::net::IpAddr;
715+
use std::net::{IpAddr, TcpListener};
716716

717717
let mut ip_v4 = None;
718718
let mut ip_v6 = None;
719-
let builder = TcpBuilder::new_v6().unwrap();
720719

721720
let ips = pnet::datalink::interfaces()
722721
.into_iter()
723722
.flat_map(|i| i.ips.into_iter().map(|n| n.ip()));
724723

725724
for ip in ips {
726725
match ip {
727-
IpAddr::V4(ip) if builder.bind((ip, 0)).is_ok() => ip_v4 = Some(ip),
728-
IpAddr::V6(ip) if builder.bind((ip, 0)).is_ok() => ip_v6 = Some(ip),
726+
IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip),
727+
IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip),
729728
_ => (),
730729
}
731730

0 commit comments

Comments
 (0)