Skip to content

Commit 309046c

Browse files
committed
feat(client): allow !Send IO with HTTP/1 client
This removes the requirement of the IO type from being `Send` for the HTTP/1 client connection. To do so, the ability to perform `hyper::upgrade`s had to be moved to a separate type which does require the `Send` bound. This mirrors how the server types do it. The `Connection` type now has a `with_upgrades()` method to convert. Closes #3363 BREAKING CHANGE: If you use client HTTP/1 upgrades, you must call `Connection::with_upgrades()` to still work the same.
1 parent 54c8670 commit 309046c

File tree

3 files changed

+83
-32
lines changed

3 files changed

+83
-32
lines changed

examples/single_threaded.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ async fn http1_server() -> Result<(), Box<dyn std::error::Error>> {
138138
loop {
139139
let (stream, _) = listener.accept().await?;
140140

141-
let io = TokioIo::new(stream);
141+
let io = IOTypeNotSend::new(TokioIo::new(stream));
142142

143143
let cnt = counter.clone();
144144

@@ -166,7 +166,7 @@ async fn http1_client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>>
166166
let addr = format!("{}:{}", host, port);
167167
let stream = TcpStream::connect(addr).await?;
168168

169-
let io = TokioIo::new(stream);
169+
let io = IOTypeNotSend::new(TokioIo::new(stream));
170170

171171
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
172172

examples/upgrades.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
107107
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
108108

109109
tokio::task::spawn(async move {
110-
if let Err(err) = conn.await {
110+
// Don't forget to enable upgrades on the connection.
111+
if let Err(err) = conn.with_upgrades().await {
111112
println!("Connection failed: {:?}", err);
112113
}
113114
});

src/client/conn/http1.rs

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use httparse::ParserConfig;
1414
use super::super::dispatch;
1515
use crate::body::{Body, Incoming as IncomingBody};
1616
use crate::proto;
17-
use crate::upgrade::Upgraded;
1817

1918
type Dispatcher<T, B> =
2019
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
@@ -51,23 +50,23 @@ pub struct Parts<T> {
5150
#[must_use = "futures do nothing unless polled"]
5251
pub struct Connection<T, B>
5352
where
54-
T: Read + Write + Send + 'static,
53+
T: Read + Write + 'static,
5554
B: Body + 'static,
5655
{
57-
inner: Option<Dispatcher<T, B>>,
56+
inner: Dispatcher<T, B>,
5857
}
5958

6059
impl<T, B> Connection<T, B>
6160
where
62-
T: Read + Write + Send + Unpin + 'static,
61+
T: Read + Write + Unpin + 'static,
6362
B: Body + 'static,
6463
B::Error: Into<Box<dyn StdError + Send + Sync>>,
6564
{
6665
/// Return the inner IO object, and additional information.
6766
///
6867
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
6968
pub fn into_parts(self) -> Parts<T> {
70-
let (io, read_buf, _) = self.inner.expect("already upgraded").into_inner();
69+
let (io, read_buf, _) = self.inner.into_inner();
7170
Parts {
7271
io,
7372
read_buf,
@@ -87,10 +86,7 @@ where
8786
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
8887
/// to work with this function; or use the `without_shutdown` wrapper.
8988
pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
90-
self.inner
91-
.as_mut()
92-
.expect("already upgraded")
93-
.poll_without_shutdown(cx)
89+
self.inner.poll_without_shutdown(cx)
9490
}
9591
}
9692

@@ -119,7 +115,7 @@ pub struct Builder {
119115
/// See [`client::conn`](crate::client::conn) for more.
120116
pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
121117
where
122-
T: Read + Write + Unpin + Send + 'static,
118+
T: Read + Write + Unpin + 'static,
123119
B: Body + 'static,
124120
B::Data: Send,
125121
B::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -240,9 +236,23 @@ impl<B> fmt::Debug for SendRequest<B> {
240236

241237
// ===== impl Connection
242238

239+
impl<T, B> Connection<T, B>
240+
where
241+
T: Read + Write + Unpin + Send + 'static,
242+
B: Body + 'static,
243+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
244+
{
245+
/// Enable this connection to support higher-level HTTP upgrades.
246+
///
247+
/// See [the `upgrade` module](crate::upgrade) for more.
248+
pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
249+
upgrades::UpgradeableConnection { inner: Some(self) }
250+
}
251+
}
252+
243253
impl<T, B> fmt::Debug for Connection<T, B>
244254
where
245-
T: Read + Write + fmt::Debug + Send + 'static,
255+
T: Read + Write + fmt::Debug + 'static,
246256
B: Body + 'static,
247257
{
248258
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -252,27 +262,24 @@ where
252262

253263
impl<T, B> Future for Connection<T, B>
254264
where
255-
T: Read + Write + Unpin + Send + 'static,
265+
T: Read + Write + Unpin + 'static,
256266
B: Body + 'static,
257267
B::Data: Send,
258268
B::Error: Into<Box<dyn StdError + Send + Sync>>,
259269
{
260270
type Output = crate::Result<()>;
261271

262272
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263-
match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
273+
match ready!(Pin::new(&mut self.inner).poll(cx))? {
264274
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
265-
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
266-
Some(h1) => {
267-
let (io, buf, _) = h1.into_inner();
268-
pending.fulfill(Upgraded::new(io, buf));
269-
Poll::Ready(Ok(()))
270-
}
271-
_ => {
272-
drop(pending);
273-
unreachable!("Upgraded twice");
274-
}
275-
},
275+
proto::Dispatched::Upgrade(pending) => {
276+
// With no `Send` bound on `I`, we can't try to do
277+
// upgrades here. In case a user was trying to use
278+
// `upgrade` with this API, send a special
279+
// error letting them know about that.
280+
pending.manual();
281+
Poll::Ready(Ok(()))
282+
}
276283
}
277284
}
278285
}
@@ -474,7 +481,7 @@ impl Builder {
474481
io: T,
475482
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
476483
where
477-
T: Read + Write + Unpin + Send + 'static,
484+
T: Read + Write + Unpin + 'static,
478485
B: Body + 'static,
479486
B::Data: Send,
480487
B::Error: Into<Box<dyn StdError + Send + Sync>>,
@@ -518,10 +525,53 @@ impl Builder {
518525
let cd = proto::h1::dispatch::Client::new(rx);
519526
let proto = proto::h1::Dispatcher::new(cd, conn);
520527

521-
Ok((
522-
SendRequest { dispatch: tx },
523-
Connection { inner: Some(proto) },
524-
))
528+
Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
529+
}
530+
}
531+
}
532+
533+
mod upgrades {
534+
use crate::upgrade::Upgraded;
535+
536+
use super::*;
537+
538+
// A future binding a connection with a Service with Upgrade support.
539+
//
540+
// This type is unnameable outside the crate.
541+
#[must_use = "futures do nothing unless polled"]
542+
#[allow(missing_debug_implementations)]
543+
pub struct UpgradeableConnection<T, B>
544+
where
545+
T: Read + Write + Unpin + Send + 'static,
546+
B: Body + 'static,
547+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
548+
{
549+
pub(super) inner: Option<Connection<T, B>>,
550+
}
551+
552+
impl<I, B> Future for UpgradeableConnection<I, B>
553+
where
554+
I: Read + Write + Unpin + Send + 'static,
555+
B: Body + 'static,
556+
B::Data: Send,
557+
B::Error: Into<Box<dyn StdError + Send + Sync>>,
558+
{
559+
type Output = crate::Result<()>;
560+
561+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
562+
match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
563+
Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
564+
Ok(proto::Dispatched::Upgrade(pending)) => {
565+
let Parts {
566+
io,
567+
read_buf,
568+
_inner,
569+
} = self.inner.take().unwrap().into_parts();
570+
pending.fulfill(Upgraded::new(io, read_buf));
571+
Poll::Ready(Ok(()))
572+
}
573+
Err(e) => Poll::Ready(Err(e)),
574+
}
525575
}
526576
}
527577
}

0 commit comments

Comments
 (0)