Skip to content

Commit ed497bf

Browse files
committed
feat(client): allow custom executors for HttpConnector
1 parent e507510 commit ed497bf

File tree

3 files changed

+71
-40
lines changed

3 files changed

+71
-40
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ include = [
2222
base64 = "0.6"
2323
bytes = "0.4.4"
2424
futures = "0.1.14"
25-
futures-cpupool = "0.1"
25+
futures-cpupool = "0.1.6"
2626
http = { version = "0.1", optional = true }
2727
httparse = "1.0"
2828
language-tags = "0.2"

src/client/connect.rs

+59-11
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ use std::error::Error as StdError;
22
use std::fmt;
33
use std::io;
44
use std::mem;
5+
use std::sync::Arc;
56
//use std::net::SocketAddr;
67

78
use futures::{Future, Poll, Async};
9+
use futures::future::{Executor, ExecuteError};
10+
use futures::sync::oneshot;
11+
use futures_cpupool::{Builder as CpuPoolBuilder};
812
use tokio_io::{AsyncRead, AsyncWrite};
913
use tokio::reactor::Handle;
1014
use tokio::net::{TcpStream, TcpStreamNew};
@@ -43,22 +47,35 @@ where T: Service<Request=Uri, Error=io::Error> + 'static,
4347
/// A connector for the `http` scheme.
4448
#[derive(Clone)]
4549
pub struct HttpConnector {
46-
dns: dns::Dns,
50+
executor: HttpConnectExecutor,
4751
enforce_http: bool,
4852
handle: Handle,
4953
}
5054

5155
impl HttpConnector {
52-
5356
/// Construct a new HttpConnector.
5457
///
5558
/// Takes number of DNS worker threads.
5659
#[inline]
5760
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
61+
let pool = CpuPoolBuilder::new()
62+
.name_prefix("hyper-dns")
63+
.pool_size(threads)
64+
.create();
65+
HttpConnector::new_with_executor(pool, handle)
66+
}
67+
68+
/// Construct a new HttpConnector.
69+
///
70+
/// Takes an executor to run blocking tasks on.
71+
#[inline]
72+
pub fn new_with_executor<E: 'static>(executor: E, handle: &Handle) -> HttpConnector
73+
where E: Executor<HttpConnectorBlockingTask>
74+
{
5875
HttpConnector {
59-
dns: dns::Dns::new(threads),
76+
executor: HttpConnectExecutor(Arc::new(executor)),
6077
enforce_http: true,
61-
handle: handle.clone(),
78+
handle: handle.clone()
6279
}
6380
}
6481

@@ -109,7 +126,7 @@ impl Service for HttpConnector {
109126
};
110127

111128
HttpConnecting {
112-
state: State::Lazy(self.dns.clone(), host.into(), port),
129+
state: State::Lazy(self.executor.clone(), host.into(), port),
113130
handle: self.handle.clone(),
114131
}
115132
}
@@ -154,8 +171,8 @@ pub struct HttpConnecting {
154171
}
155172

156173
enum State {
157-
Lazy(dns::Dns, String, u16),
158-
Resolving(dns::Query),
174+
Lazy(HttpConnectExecutor, String, u16),
175+
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
159176
Connecting(ConnectingTcp),
160177
Error(Option<io::Error>),
161178
}
@@ -168,12 +185,13 @@ impl Future for HttpConnecting {
168185
loop {
169186
let state;
170187
match self.state {
171-
State::Lazy(ref dns, ref mut host, port) => {
188+
State::Lazy(ref executor, ref mut host, port) => {
172189
let host = mem::replace(host, String::new());
173-
state = State::Resolving(dns.resolve(host, port));
190+
let work = dns::Work::new(host, port);
191+
state = State::Resolving(oneshot::spawn(work, executor));
174192
},
175-
State::Resolving(ref mut query) => {
176-
match try!(query.poll()) {
193+
State::Resolving(ref mut future) => {
194+
match try!(future.poll()) {
177195
Async::NotReady => return Ok(Async::NotReady),
178196
Async::Ready(addrs) => {
179197
state = State::Connecting(ConnectingTcp {
@@ -231,6 +249,36 @@ impl ConnectingTcp {
231249
}
232250
}
233251

252+
/// Blocking task to be executed on a thread pool.
253+
pub struct HttpConnectorBlockingTask {
254+
work: oneshot::Execute<dns::Work>
255+
}
256+
257+
impl fmt::Debug for HttpConnectorBlockingTask {
258+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
259+
f.pad("HttpConnectorBlockingTask")
260+
}
261+
}
262+
263+
impl Future for HttpConnectorBlockingTask {
264+
type Item = ();
265+
type Error = ();
266+
267+
fn poll(&mut self) -> Poll<(), ()> {
268+
self.work.poll()
269+
}
270+
}
271+
272+
#[derive(Clone)]
273+
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask>>);
274+
275+
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
276+
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
277+
self.0.execute(HttpConnectorBlockingTask { work: future })
278+
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
279+
}
280+
}
281+
234282
/*
235283
impl<S: SslClient> HttpsConnector<S> {
236284
/// Create a new connector using the provided SSL implementation.

src/client/dns.rs

+11-28
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,27 @@ use std::io;
22
use std::net::{SocketAddr, ToSocketAddrs};
33
use std::vec;
44

5-
use ::futures::{Future, Poll};
6-
use ::futures_cpupool::{CpuPool, CpuFuture, Builder};
5+
use ::futures::{Async, Future, Poll};
76

8-
#[derive(Clone)]
9-
pub struct Dns {
10-
pool: CpuPool,
7+
pub struct Work {
8+
host: String,
9+
port: u16
1110
}
1211

13-
impl Dns {
14-
pub fn new(threads: usize) -> Dns {
15-
Dns {
16-
pool: Builder::new()
17-
.name_prefix("hyper-dns")
18-
.pool_size(threads)
19-
.create()
20-
}
21-
}
22-
23-
pub fn resolve(&self, host: String, port: u16) -> Query {
24-
Query(self.pool.spawn_fn(move || work(host, port)))
12+
impl Work {
13+
pub fn new(host: String, port: u16) -> Work {
14+
Work { host: host, port: port }
2515
}
2616
}
2717

28-
pub struct Query(CpuFuture<IpAddrs, io::Error>);
29-
30-
impl Future for Query {
18+
impl Future for Work {
3119
type Item = IpAddrs;
3220
type Error = io::Error;
3321

3422
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
35-
self.0.poll()
23+
debug!("resolve host={:?}, port={:?}", self.host, self.port);
24+
(&*self.host, self.port).to_socket_addrs()
25+
.map(|i| Async::Ready(IpAddrs { iter: i }))
3626
}
3727
}
3828

@@ -47,10 +37,3 @@ impl Iterator for IpAddrs {
4737
self.iter.next()
4838
}
4939
}
50-
51-
pub type Answer = io::Result<IpAddrs>;
52-
53-
fn work(hostname: String, port: u16) -> Answer {
54-
debug!("resolve host={:?}, port={:?}", hostname, port);
55-
(&*hostname, port).to_socket_addrs().map(|i| IpAddrs { iter: i })
56-
}

0 commit comments

Comments
 (0)