Skip to content

Commit 4d7a226

Browse files
committed
feat(client): change connectors to return an impl Connection
Instead of returning a tuple `(impl AsyncRead + AsyncWrite, Connected)`, this adds a new trait, `hyper::client::connect::Connection`, which allows querying the connection type for a `Connected`. BREAKING CHANGE: Connectors no longer return a tuple of `(T, Connected)`, but a single `T: Connection`.
1 parent 319e8ae commit 4d7a226

File tree

6 files changed

+68
-37
lines changed

6 files changed

+68
-37
lines changed

examples/client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
use std::env;
44
use std::io::{self, Write};
55

6-
use hyper::Client;
7-
use futures_util::StreamExt;
6+
use hyper::{Client, body::HttpBody as _};
87

98
// A simple type alias so as to DRY.
109
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

src/client/connect/http.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tokio::net::TcpStream;
1717
use tokio::time::Delay;
1818

1919
use super::dns::{self, resolve, GaiResolver, Resolve};
20-
use super::{Connected};
20+
use super::{Connected, Connection};
2121
//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
2222

2323

@@ -234,7 +234,7 @@ where
234234
R: Resolve + Clone + Send + Sync + 'static,
235235
R::Future: Send,
236236
{
237-
type Response = (TcpStream, Connected);
237+
type Response = TcpStream;
238238
type Error = ConnectError;
239239
type Future = HttpConnecting<R>;
240240

@@ -259,7 +259,7 @@ where
259259
async fn call_async(
260260
&mut self,
261261
dst: Uri,
262-
) -> Result<(TcpStream, Connected), ConnectError> {
262+
) -> Result<TcpStream, ConnectError> {
263263
trace!(
264264
"Http::connect; scheme={:?}, host={:?}, port={:?}",
265265
dst.scheme(),
@@ -340,14 +340,20 @@ where
340340
sock.set_nodelay(config.nodelay)
341341
.map_err(ConnectError::m("tcp set_nodelay error"))?;
342342

343-
let extra = HttpInfo {
344-
remote_addr: sock
345-
.peer_addr()
346-
.map_err(ConnectError::m("tcp peer_addr error"))?,
347-
};
348-
let connected = Connected::new().extra(extra);
343+
Ok(sock)
344+
}
345+
}
349346

350-
Ok((sock, connected))
347+
impl Connection for TcpStream {
348+
fn connected(&self) -> Connected {
349+
let connected = Connected::new();
350+
if let Ok(remote_addr) = self.peer_addr() {
351+
connected.extra(HttpInfo {
352+
remote_addr,
353+
})
354+
} else {
355+
connected
356+
}
351357
}
352358
}
353359

@@ -372,7 +378,7 @@ pub struct HttpConnecting<R> {
372378
_marker: PhantomData<R>,
373379
}
374380

375-
type ConnectResult = Result<(TcpStream, Connected), ConnectError>;
381+
type ConnectResult = Result<TcpStream, ConnectError>;
376382
type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
377383

378384
impl<R: Resolve> Future for HttpConnecting<R> {
@@ -644,12 +650,12 @@ mod tests {
644650
use ::http::Uri;
645651

646652
use super::super::sealed::Connect;
647-
use super::{Connected, HttpConnector};
653+
use super::HttpConnector;
648654

649655
async fn connect<C>(
650656
connector: C,
651657
dst: Uri,
652-
) -> Result<(C::Transport, Connected), C::Error>
658+
) -> Result<C::Transport, C::Error>
653659
where
654660
C: Connect,
655661
{

src/client/connect/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ use ::http::{Response};
1313
#[cfg(feature = "tcp")] mod http;
1414
#[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo};
1515

16+
/// Describes a type returned by a connector.
17+
pub trait Connection {
18+
/// Return metadata describing the connection.
19+
fn connected(&self) -> Connected;
20+
}
21+
1622
/// Extra information about the connected transport.
1723
///
1824
/// This can be used to inform recipients about things like if ALPN
@@ -167,7 +173,7 @@ pub(super) mod sealed {
167173
use tokio::io::{AsyncRead, AsyncWrite};
168174

169175
use crate::common::{Future, Unpin};
170-
use super::{Connected};
176+
use super::{Connection};
171177

172178
/// Connect to a destination, returning an IO transport.
173179
///
@@ -183,21 +189,21 @@ pub(super) mod sealed {
183189
// fit the `Connect` bounds because of the blanket impl for `Service`.
184190
pub trait Connect: Sealed + Sized {
185191
/// The connected IO Stream.
186-
type Transport: AsyncRead + AsyncWrite;
192+
type Transport: AsyncRead + AsyncWrite + Connection;
187193
/// An error occured when trying to connect.
188194
type Error: Into<Box<dyn StdError + Send + Sync>>;
189195
/// A Future that will resolve to the connected Transport.
190-
type Future: Future<Output=Result<(Self::Transport, Connected), Self::Error>>;
196+
type Future: Future<Output=Result<Self::Transport, Self::Error>>;
191197
#[doc(hidden)]
192198
fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
193199
}
194200

195201
impl<S, T> Connect for S
196202
where
197-
S: tower_service::Service<Uri, Response=(T, Connected)> + Send,
203+
S: tower_service::Service<Uri, Response=T> + Send,
198204
S::Error: Into<Box<dyn StdError + Send + Sync>>,
199205
S::Future: Unpin + Send,
200-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
206+
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
201207
{
202208
type Transport = T;
203209
type Error = S::Error;
@@ -209,10 +215,10 @@ pub(super) mod sealed {
209215

210216
impl<S, T> Sealed for S
211217
where
212-
S: tower_service::Service<Uri, Response=(T, Connected)> + Send,
218+
S: tower_service::Service<Uri, Response=T> + Send,
213219
S::Error: Into<Box<dyn StdError + Send + Sync>>,
214220
S::Future: Unpin + Send,
215-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
221+
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
216222
{}
217223

218224
pub trait Sealed {}

src/client/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ use http::uri::Scheme;
7171

7272
use crate::body::{Body, Payload};
7373
use crate::common::{lazy as hyper_lazy, BoxSendFuture, Executor, Lazy, Future, Pin, Poll, task};
74-
use self::connect::{Alpn, sealed::Connect, Connected};
74+
use self::connect::{Alpn, sealed::Connect, Connected, Connection};
7575
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
7676

7777
#[cfg(feature = "tcp")] pub use self::connect::HttpConnector;
@@ -478,7 +478,8 @@ where C: Connect + Clone + Send + Sync + 'static,
478478
};
479479
Either::Left(connector.connect(connect::sealed::Internal, dst)
480480
.map_err(crate::Error::new_connect)
481-
.and_then(move |(io, connected)| {
481+
.and_then(move |io| {
482+
let connected = io.connected();
482483
// If ALPN is h2 and we aren't http2_only already,
483484
// then we need to convert our pool checkout into
484485
// a single HTTP2 one.

tests/client.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ mod dispatch_impl {
945945
use tokio::io::{AsyncRead, AsyncWrite};
946946
use tokio::net::TcpStream;
947947

948-
use hyper::client::connect::{Connected, HttpConnector};
948+
use hyper::client::connect::{Connected, Connection, HttpConnector};
949949
use hyper::Client;
950950

951951
#[test]
@@ -1740,7 +1740,7 @@ mod dispatch_impl {
17401740
}
17411741

17421742
impl hyper::service::Service<Uri> for DebugConnector {
1743-
type Response = (DebugStream, Connected);
1743+
type Response = DebugStream;
17441744
type Error = <HttpConnector as hyper::service::Service<Uri>>::Error;
17451745
type Future = Pin<Box<dyn Future<
17461746
Output = Result<Self::Response, Self::Error>
@@ -1756,38 +1756,45 @@ mod dispatch_impl {
17561756
let closes = self.closes.clone();
17571757
let is_proxy = self.is_proxy;
17581758
let is_alpn_h2 = self.alpn_h2;
1759-
Box::pin(self.http.call(dst).map_ok(move |(s, mut c)| {
1760-
if is_alpn_h2 {
1761-
c = c.negotiated_h2();
1759+
Box::pin(self.http.call(dst).map_ok(move |tcp| {
1760+
DebugStream {
1761+
tcp,
1762+
on_drop: closes,
1763+
is_alpn_h2,
1764+
is_proxy,
17621765
}
1763-
(DebugStream(s, closes), c.proxy(is_proxy))
17641766
}))
17651767
}
17661768
}
17671769

1768-
struct DebugStream(TcpStream, mpsc::Sender<()>);
1770+
struct DebugStream {
1771+
tcp: TcpStream,
1772+
on_drop: mpsc::Sender<()>,
1773+
is_alpn_h2: bool,
1774+
is_proxy: bool,
1775+
}
17691776

17701777
impl Drop for DebugStream {
17711778
fn drop(&mut self) {
1772-
let _ = self.1.try_send(());
1779+
let _ = self.on_drop.try_send(());
17731780
}
17741781
}
17751782

17761783
impl AsyncWrite for DebugStream {
17771784
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1778-
Pin::new(&mut self.0).poll_shutdown(cx)
1785+
Pin::new(&mut self.tcp).poll_shutdown(cx)
17791786
}
17801787

17811788
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
1782-
Pin::new(&mut self.0).poll_flush(cx)
1789+
Pin::new(&mut self.tcp).poll_flush(cx)
17831790
}
17841791

17851792
fn poll_write(
17861793
mut self: Pin<&mut Self>,
17871794
cx: &mut Context<'_>,
17881795
buf: &[u8],
17891796
) -> Poll<Result<usize, io::Error>> {
1790-
Pin::new(&mut self.0).poll_write(cx, buf)
1797+
Pin::new(&mut self.tcp).poll_write(cx, buf)
17911798
}
17921799
}
17931800

@@ -1797,7 +1804,19 @@ mod dispatch_impl {
17971804
cx: &mut Context<'_>,
17981805
buf: &mut [u8],
17991806
) -> Poll<Result<usize, io::Error>> {
1800-
Pin::new(&mut self.0).poll_read(cx, buf)
1807+
Pin::new(&mut self.tcp).poll_read(cx, buf)
1808+
}
1809+
}
1810+
1811+
impl Connection for DebugStream {
1812+
fn connected(&self) -> Connected {
1813+
let connected = self.tcp.connected().proxy(self.is_proxy);
1814+
1815+
if self.is_alpn_h2 {
1816+
connected.negotiated_h2()
1817+
} else {
1818+
connected
1819+
}
18011820
}
18021821
}
18031822
}

tests/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1779,7 +1779,7 @@ impl tower_service::Service<Request<Body>> for TestService {
17791779
let replies = self.reply.clone();
17801780

17811781
Box::pin(async move {
1782-
while let Some(chunk) = req.body_mut().next().await {
1782+
while let Some(chunk) = hyper::body::HttpBody::next(req.body_mut()).await {
17831783
match chunk {
17841784
Ok(chunk) => {
17851785
tx.send(Msg::Chunk(chunk.to_vec())).unwrap();

0 commit comments

Comments
 (0)