Skip to content

Commit ac1a31f

Browse files
committed
feat(client): redesign the Connect trait
The original `Connect` trait had some limitations: - There was no way to provide more details to the connector about how to connect, other than the `Uri`. - There was no way for the connector to return any extra information about the connected transport. - The `Error` was forced to be an `std::io::Error`. - The transport and future had `'static` requirements. As hyper gains HTTP/2 support, some of these things needed to be changed. We want to allow the user to configure whether they hope to us ALPN to start an HTTP/2 connection, and the connector needs to be able to return back to hyper if it did so. The new `Connect2` trait is meant to solve this. - The `connect` method now receives a `Destination` type, instead of a `Uri`. This allows us to include additional data about how to connect. - The `Future` returned from `connect` now must be a `Connected`, which wraps the transport, and includes possibly extra data about what happened when connecting. The `Connect` trait is deprecated, with the hopes of `Connect2` taking it's place in the next breaking release. For backwards compatibility, any type that implements `Connect` now will automaticall implement `Connect2`, ignoring any of the extra data from `Destination` and `Connected`.
1 parent c33b9d4 commit ac1a31f

File tree

3 files changed

+196
-53
lines changed

3 files changed

+196
-53
lines changed

src/client/compat.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
//! Wrappers to build compatibility with the `http` crate.
22
3+
use std::io;
4+
35
use futures::{Future, Poll, Stream};
46
use http;
57
use tokio_service::Service;
68

7-
use client::{Connect, Client, FutureResponse};
9+
use client::{Connect2, Client, FutureResponse};
810
use error::Error;
911
use proto::Body;
1012

@@ -19,7 +21,9 @@ pub(super) fn client<C, B>(client: Client<C, B>) -> CompatClient<C, B> {
1921
}
2022

2123
impl<C, B> Service for CompatClient<C, B>
22-
where C: Connect,
24+
where C: Connect2<Error=io::Error>,
25+
C::Transport: 'static,
26+
C::Future: 'static,
2327
B: Stream<Error=Error> + 'static,
2428
B::Item: AsRef<[u8]>,
2529
{

src/client/connect.rs

Lines changed: 169 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
//! Contains the `Connect2` trait, and supporting types.
12
use std::error::Error as StdError;
23
use std::fmt;
34
use std::io;
45
use std::mem;
56
use std::sync::Arc;
6-
//use std::net::SocketAddr;
77

88
use futures::{Future, Poll, Async};
99
use futures::future::{Executor, ExecuteError};
@@ -16,31 +16,74 @@ use tokio_service::Service;
1616
use Uri;
1717

1818
use super::dns;
19+
use self::http_connector::HttpConnectorBlockingTask;
20+
21+
/// Connect to a destination, returning an IO transport.
22+
pub trait Connect2 {
23+
/// The connected IO Stream.
24+
type Transport: AsyncRead + AsyncWrite;
25+
/// An error occured when trying to connect.
26+
type Error;
27+
/// A Future that will resolve to the connected Transport.
28+
type Future: Future<Item=Connected<Self::Transport>, Error=Self::Error>;
29+
/// Connect to a destination.
30+
fn connect(&self, dst: Destination) -> Self::Future;
31+
}
1932

20-
/// A connector creates an Io to a remote address..
21-
///
22-
/// This trait is not implemented directly, and only exists to make
23-
/// the intent clearer. A connector should implement `Service` with
24-
/// `Request=Uri` and `Response: Io` instead.
25-
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
26-
/// The connected Io Stream.
27-
type Output: AsyncRead + AsyncWrite + 'static;
28-
/// A Future that will resolve to the connected Stream.
29-
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
30-
/// Connect to a remote address.
31-
fn connect(&self, Uri) -> <Self as Connect>::Future;
33+
/// A set of properties to describe where and how to try to connect.
34+
#[derive(Debug)]
35+
pub struct Destination {
36+
pub(super) alpn: Alpn,
37+
pub(super) uri: Uri,
3238
}
3339

34-
impl<T> Connect for T
35-
where T: Service<Request=Uri, Error=io::Error> + 'static,
36-
T::Response: AsyncRead + AsyncWrite,
37-
T::Future: Future<Error=io::Error>,
38-
{
39-
type Output = T::Response;
40-
type Future = T::Future;
40+
/// Extra information about the connected transport.
41+
#[derive(Debug)]
42+
pub struct Connected<T> {
43+
alpn: Alpn,
44+
pub(super) transport: T,
45+
}
4146

42-
fn connect(&self, url: Uri) -> <Self as Connect>::Future {
43-
self.call(url)
47+
#[derive(Debug)]
48+
pub(super) enum Alpn {
49+
Http1,
50+
H2,
51+
}
52+
53+
impl Destination {
54+
/// Get a reference to the requested `Uri`.
55+
pub fn uri(&self) -> &Uri {
56+
&self.uri
57+
}
58+
59+
/// Returns whether this connection must negotiate HTTP/2 via ALPN.
60+
pub fn h2(&self) -> bool {
61+
match self.alpn {
62+
Alpn::Http1 => false,
63+
Alpn::H2 => true,
64+
}
65+
}
66+
}
67+
68+
impl<T> Connected<T> {
69+
/// Create new `Connected` type with empty metadata.
70+
pub fn new(transport: T) -> Connected<T> {
71+
Connected {
72+
alpn: Alpn::Http1,
73+
transport: transport,
74+
}
75+
}
76+
77+
/// Convert into the underlying Transport.
78+
pub fn into_transport(self) -> T {
79+
self.transport
80+
}
81+
82+
/// Set that the connected transport negotiated HTTP/2 as it's
83+
/// next protocol.
84+
pub fn h2(&mut self) -> &mut Connected<T> {
85+
self.alpn = Alpn::H2;
86+
self
4487
}
4588
}
4689

@@ -96,6 +139,8 @@ impl fmt::Debug for HttpConnector {
96139
}
97140
}
98141

142+
// deprecated, will be gone in 0.12
143+
#[doc(hidden)]
99144
impl Service for HttpConnector {
100145
type Request = Uri;
101146
type Response = TcpStream;
@@ -258,23 +303,27 @@ impl ConnectingTcp {
258303
}
259304
}
260305

261-
/// Blocking task to be executed on a thread pool.
262-
pub struct HttpConnectorBlockingTask {
263-
work: oneshot::Execute<dns::Work>
264-
}
306+
// Make this Future unnameable outside of this crate.
307+
mod http_connector {
308+
use super::*;
309+
// Blocking task to be executed on a thread pool.
310+
pub struct HttpConnectorBlockingTask {
311+
pub(super) work: oneshot::Execute<dns::Work>
312+
}
265313

266-
impl fmt::Debug for HttpConnectorBlockingTask {
267-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
268-
f.pad("HttpConnectorBlockingTask")
314+
impl fmt::Debug for HttpConnectorBlockingTask {
315+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
316+
f.pad("HttpConnectorBlockingTask")
317+
}
269318
}
270-
}
271319

272-
impl Future for HttpConnectorBlockingTask {
273-
type Item = ();
274-
type Error = ();
320+
impl Future for HttpConnectorBlockingTask {
321+
type Item = ();
322+
type Error = ();
275323

276-
fn poll(&mut self) -> Poll<(), ()> {
277-
self.work.poll()
324+
fn poll(&mut self) -> Poll<(), ()> {
325+
self.work.poll()
326+
}
278327
}
279328
}
280329

@@ -288,20 +337,97 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
288337
}
289338
}
290339

291-
/*
292-
impl<S: SslClient> HttpsConnector<S> {
293-
/// Create a new connector using the provided SSL implementation.
294-
pub fn new(s: S) -> HttpsConnector<S> {
295-
HttpsConnector {
296-
http: HttpConnector::default(),
297-
ssl: s,
340+
#[doc(hidden)]
341+
#[deprecated(since="0.11.16", note="Use the Connect2 trait, which will become Connect in 0.12")]
342+
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
343+
/// The connected Io Stream.
344+
type Output: AsyncRead + AsyncWrite + 'static;
345+
/// A Future that will resolve to the connected Stream.
346+
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
347+
/// Connect to a remote address.
348+
fn connect(&self, Uri) -> <Self as Connect>::Future;
349+
}
350+
351+
#[doc(hidden)]
352+
#[allow(deprecated)]
353+
impl<T> Connect for T
354+
where T: Service<Request=Uri, Error=io::Error> + 'static,
355+
T::Response: AsyncRead + AsyncWrite,
356+
T::Future: Future<Error=io::Error>,
357+
{
358+
type Output = T::Response;
359+
type Future = T::Future;
360+
361+
fn connect(&self, url: Uri) -> <Self as Connect>::Future {
362+
self.call(url)
363+
}
364+
}
365+
366+
#[doc(hidden)]
367+
#[allow(deprecated)]
368+
impl<T> Connect2 for T
369+
where
370+
T: Connect,
371+
{
372+
type Transport = <T as Connect>::Output;
373+
type Error = io::Error;
374+
type Future = ConnectToConnect2Future<<T as Connect>::Future>;
375+
376+
fn connect(&self, dst: Destination) -> <Self as Connect2>::Future {
377+
ConnectToConnect2Future {
378+
inner: <Self as Connect>::connect(self, dst.uri),
298379
}
299380
}
300381
}
301-
*/
382+
383+
#[doc(hidden)]
384+
#[deprecated(since="0.11.16")]
385+
#[allow(missing_debug_implementations)]
386+
pub struct ConnectToConnect2Future<F> {
387+
inner: F,
388+
}
389+
390+
#[allow(deprecated)]
391+
impl<F> Future for ConnectToConnect2Future<F>
392+
where
393+
F: Future,
394+
{
395+
type Item = Connected<F::Item>;
396+
type Error = F::Error;
397+
398+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
399+
self.inner.poll()
400+
.map(|async| async.map(Connected::new))
401+
}
402+
}
403+
404+
// even though deprecated, we need to make sure the HttpConnector still
405+
// implements Connect (and Service apparently...)
406+
407+
#[allow(deprecated)]
408+
fn _assert_http_connector() {
409+
fn assert_connect<T>()
410+
where
411+
T: Connect2<
412+
Transport=TcpStream,
413+
Error=io::Error,
414+
Future=ConnectToConnect2Future<HttpConnecting>
415+
>,
416+
T: Connect<Output=TcpStream, Future=HttpConnecting>,
417+
T: Service<
418+
Request=Uri,
419+
Response=TcpStream,
420+
Future=HttpConnecting,
421+
Error=io::Error
422+
>,
423+
{}
424+
425+
assert_connect::<HttpConnector>();
426+
}
302427

303428
#[cfg(test)]
304429
mod tests {
430+
#![allow(deprecated)]
305431
use std::io;
306432
use tokio::reactor::Core;
307433
use super::{Connect, HttpConnector};

src/client/mod.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ use version::HttpVersion;
2424

2525
pub use proto::response::Response;
2626
pub use proto::request::Request;
27-
pub use self::connect::{HttpConnector, Connect};
27+
pub use self::connect::{Connect2, HttpConnector};
28+
#[allow(deprecated)]
29+
pub use self::connect::Connect;
2830

2931
use self::background::{bg, Background};
32+
use self::connect::Destination;
3033

31-
mod connect;
34+
pub mod connect;
3235
mod dns;
3336
mod pool;
3437
#[cfg(feature = "compat")]
@@ -99,7 +102,9 @@ impl<C, B> Client<C, B> {
99102
}
100103

101104
impl<C, B> Client<C, B>
102-
where C: Connect,
105+
where C: Connect2<Error=io::Error>,
106+
C::Transport: 'static,
107+
C::Future: 'static,
103108
B: Stream<Error=::Error> + 'static,
104109
B::Item: AsRef<[u8]>,
105110
{
@@ -149,7 +154,9 @@ impl Future for FutureResponse {
149154
}
150155

151156
impl<C, B> Service for Client<C, B>
152-
where C: Connect,
157+
where C: Connect2<Error=io::Error>,
158+
C::Transport: 'static,
159+
C::Future: 'static,
153160
B: Stream<Error=::Error> + 'static,
154161
B::Item: AsRef<[u8]>,
155162
{
@@ -195,15 +202,19 @@ where C: Connect,
195202
let executor = self.executor.clone();
196203
let pool = self.pool.clone();
197204
let pool_key = Rc::new(domain.to_string());
198-
self.connector.connect(url)
199-
.and_then(move |io| {
205+
let dst = Destination {
206+
uri: url,
207+
alpn: self::connect::Alpn::Http1,
208+
};
209+
self.connector.connect(dst)
210+
.and_then(move |connected| {
200211
let (tx, rx) = mpsc::channel(0);
201212
let tx = HyperClient {
202213
tx: RefCell::new(tx),
203214
should_close: true,
204215
};
205216
let pooled = pool.pooled(pool_key, tx);
206-
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
217+
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(connected.transport, pooled.clone());
207218
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
208219
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
209220
Ok(pooled)
@@ -384,7 +395,9 @@ impl<C, B> Config<C, B> {
384395
}
385396

386397
impl<C, B> Config<C, B>
387-
where C: Connect,
398+
where C: Connect2<Error=io::Error>,
399+
C::Transport: 'static,
400+
C::Future: 'static,
388401
B: Stream<Error=::Error>,
389402
B::Item: AsRef<[u8]>,
390403
{

0 commit comments

Comments
 (0)