Skip to content

Commit 6d54a4d

Browse files
authored
Merge pull request #857 from hyperium/client-timeout-panic
fix(client): handle when DNS resolves after a timeout triggers
2 parents 01843f8 + 006f66f commit 6d54a4d

File tree

8 files changed

+130
-114
lines changed

8 files changed

+130
-114
lines changed

examples/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,15 @@ impl hyper::client::Handler<HttpStream> for Dump {
4848
Err(e) => match e.kind() {
4949
io::ErrorKind::WouldBlock => Next::read(),
5050
_ => {
51-
println!("ERROR: {}", e);
51+
println!("ERROR:example: {}", e);
5252
Next::end()
5353
}
5454
}
5555
}
5656
}
5757

5858
fn on_error(&mut self, err: hyper::Error) -> Next {
59-
println!("ERROR: {}", err);
59+
println!("ERROR:example: {}", err);
6060
Next::remove()
6161
}
6262
}

src/client/connect.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub trait Connect {
1616
/// Type of Transport to create
1717
type Output: Transport;
1818
/// The key used to determine if an existing socket can be used.
19-
type Key: Eq + Hash + Clone;
19+
type Key: Eq + Hash + Clone + fmt::Debug;
2020
/// Returns the key based off the Url.
2121
fn key(&self, &Url) -> Option<Self::Key>;
2222
/// Connect to a remote address.
@@ -96,10 +96,12 @@ impl Connect for HttpConnector {
9696
}
9797

9898
fn connected(&mut self) -> Option<(Self::Key, io::Result<HttpStream>)> {
99-
let (host, addr) = match self.dns.as_ref().expect("dns workers lost").resolved() {
99+
let (host, addrs) = match self.dns.as_ref().expect("dns workers lost").resolved() {
100100
Ok(res) => res,
101101
Err(_) => return None
102102
};
103+
//TODO: try all addrs
104+
let addr = addrs.and_then(|mut addrs| Ok(addrs.next().unwrap()));
103105
debug!("Http::resolved <- ({:?}, {:?})", host, addr);
104106
if let Entry::Occupied(mut entry) = self.resolving.entry(host) {
105107
let resolved = entry.get_mut().remove(0);

src/client/dns.rs

+18-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::io;
2-
use std::net::{IpAddr, ToSocketAddrs};
2+
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
33
use std::thread;
4+
use std::vec;
45

56
use ::spmc;
67

@@ -11,7 +12,19 @@ pub struct Dns {
1112
rx: channel::Receiver<Answer>,
1213
}
1314

14-
pub type Answer = (String, io::Result<IpAddr>);
15+
pub type Answer = (String, io::Result<IpAddrs>);
16+
17+
pub struct IpAddrs {
18+
iter: vec::IntoIter<SocketAddr>,
19+
}
20+
21+
impl Iterator for IpAddrs {
22+
type Item = IpAddr;
23+
#[inline]
24+
fn next(&mut self) -> Option<IpAddr> {
25+
self.iter.next().map(|addr| addr.ip())
26+
}
27+
}
1528

1629
impl Dns {
1730
pub fn new(notify: (channel::Sender<Answer>, channel::Receiver<Answer>), threads: usize) -> Dns {
@@ -26,7 +39,7 @@ impl Dns {
2639
}
2740

2841
pub fn resolve<T: Into<String>>(&self, hostname: T) {
29-
self.tx.send(hostname.into()).expect("Workers all died horribly");
42+
self.tx.send(hostname.into()).expect("DNS workers all died unexpectedly");
3043
}
3144

3245
pub fn resolved(&self) -> Result<Answer, channel::TryRecvError> {
@@ -41,9 +54,8 @@ fn work(rx: spmc::Receiver<String>, notify: channel::Sender<Answer>) {
4154
let notify = worker.notify.as_ref().expect("Worker lost notify");
4255
while let Ok(host) = rx.recv() {
4356
debug!("resolve {:?}", host);
44-
let res = match (&*host, 80).to_socket_addrs().map(|mut i| i.next()) {
45-
Ok(Some(addr)) => (host, Ok(addr.ip())),
46-
Ok(None) => (host, Err(io::Error::new(io::ErrorKind::Other, "no addresses found"))),
57+
let res = match (&*host, 80).to_socket_addrs().map(|i| IpAddrs{ iter: i }) {
58+
Ok(addrs) => (host, Ok(addrs)),
4759
Err(e) => (host, Err(e))
4860
};
4961

src/client/mod.rs

+64-49
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ mod response;
3030

3131
/// A Client to make outgoing HTTP requests.
3232
pub struct Client<H> {
33-
//handle: Option<thread::JoinHandle<()>>,
3433
tx: http::channel::Sender<Notify<H>>,
3534
}
3635

@@ -64,16 +63,6 @@ impl<H> Client<H> {
6463
pub fn configure() -> Config<DefaultConnector> {
6564
Config::default()
6665
}
67-
68-
/*TODO
69-
pub fn http() -> Config<HttpConnector> {
70-
71-
}
72-
73-
pub fn https() -> Config<HttpsConnector> {
74-
75-
}
76-
*/
7766
}
7867

7968
impl<H: Handler<<DefaultConnector as Connect>::Output>> Client<H> {
@@ -243,14 +232,6 @@ impl<H> fmt::Display for ClientError<H> {
243232
}
244233
}
245234

246-
/*
247-
impl Drop for Client {
248-
fn drop(&mut self) {
249-
self.handle.take().map(|handle| handle.join());
250-
}
251-
}
252-
*/
253-
254235
/// A trait to react to client events that happen for each message.
255236
///
256237
/// Each event handler returns it's desired `Next` action.
@@ -338,15 +319,16 @@ struct Context<K, H> {
338319
}
339320

340321
impl<K: http::Key, H> Context<K, H> {
341-
fn pop_queue(&mut self, key: &K) -> Queued<H> {
322+
fn pop_queue(&mut self, key: &K) -> Option<Queued<H>> {
342323
let mut should_remove = false;
343324
let queued = {
344-
let mut vec = self.queue.get_mut(key).expect("handler not in queue for key");
345-
let queued = vec.remove(0);
346-
if vec.is_empty() {
347-
should_remove = true;
348-
}
349-
queued
325+
self.queue.get_mut(key).map(|vec| {
326+
let queued = vec.remove(0);
327+
if vec.is_empty() {
328+
should_remove = true;
329+
}
330+
queued
331+
})
350332
};
351333
if should_remove {
352334
self.queue.remove(key);
@@ -379,16 +361,22 @@ impl<K: http::Key, H> Context<K, H> {
379361
impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> {
380362
type Output = Message<H, T>;
381363

382-
fn create(&mut self, seed: http::Seed<K>) -> Self::Output {
364+
fn create(&mut self, seed: http::Seed<K>) -> Option<Self::Output> {
383365
let key = seed.key();
384-
let queued = self.pop_queue(key);
385-
let (url, mut handler) = (queued.url, queued.handler);
386-
handler.on_control(seed.control());
387-
Message {
388-
handler: handler,
389-
url: Some(url),
390-
_marker: PhantomData,
391-
}
366+
self.pop_queue(key).map(|queued| {
367+
let (url, mut handler) = (queued.url, queued.handler);
368+
handler.on_control(seed.control());
369+
370+
Message {
371+
handler: handler,
372+
url: Some(url),
373+
_marker: PhantomData,
374+
}
375+
})
376+
}
377+
378+
fn keep_alive_interest(&self) -> Next {
379+
Next::wait()
392380
}
393381
}
394382

@@ -402,6 +390,7 @@ where C: Connect,
402390
C::Output: Transport,
403391
H: Handler<C::Output> {
404392
Connector(C, http::channel::Receiver<Notify<H>>),
393+
Connecting((C::Key, C::Output)),
405394
Socket(http::Conn<C::Key, C::Output, Message<H, C::Output>>)
406395
}
407396

@@ -415,31 +404,55 @@ where
415404

416405
impl<C, H> rotor::Machine for ClientFsm<C, H>
417406
where C: Connect,
407+
C::Key: fmt::Debug,
418408
C::Output: Transport,
419409
H: Handler<C::Output> {
420410
type Context = Context<C::Key, H>;
421411
type Seed = (C::Key, C::Output);
422412

423413
fn create(seed: Self::Seed, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, rotor::Void> {
424414
rotor_try!(scope.register(&seed.1, EventSet::writable(), PollOpt::level()));
425-
rotor::Response::ok(
426-
ClientFsm::Socket(
427-
http::Conn::new(seed.0, seed.1, scope.notifier())
428-
.keep_alive(scope.keep_alive)
429-
)
430-
)
415+
rotor::Response::ok(ClientFsm::Connecting(seed))
431416
}
432417

433418
fn ready(self, events: EventSet, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, Self::Seed> {
434419
match self {
435-
ClientFsm::Connector(..) => {
436-
unreachable!("Connector can never be ready")
437-
},
438420
ClientFsm::Socket(conn) => {
439421
let res = conn.ready(events, scope);
440422
let now = scope.now();
441423
scope.conn_response(res, now)
424+
},
425+
ClientFsm::Connecting(mut seed) => {
426+
if events.is_error() || events.is_hup() {
427+
if let Some(err) = seed.1.take_socket_error().err() {
428+
debug!("error while connecting: {:?}", err);
429+
scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err)));
430+
rotor::Response::done()
431+
} else {
432+
trace!("connecting is_error, but no socket error");
433+
rotor::Response::ok(ClientFsm::Connecting(seed))
434+
}
435+
} else if events.is_writable() {
436+
if scope.queue.contains_key(&seed.0) {
437+
trace!("connected and writable {:?}", seed.0);
438+
rotor::Response::ok(
439+
ClientFsm::Socket(
440+
http::Conn::new(seed.0, seed.1, Next::write().timeout(scope.connect_timeout), scope.notifier())
441+
.keep_alive(scope.keep_alive)
442+
)
443+
)
444+
} else {
445+
trace!("connected, but queued handler is gone: {:?}", seed.0); // probably took too long connecting
446+
rotor::Response::done()
447+
}
448+
} else {
449+
// spurious?
450+
rotor::Response::ok(ClientFsm::Connecting(seed))
451+
}
442452
}
453+
ClientFsm::Connector(..) => {
454+
unreachable!("Connector can never be ready")
455+
},
443456
}
444457
}
445458

@@ -477,6 +490,7 @@ where C: Connect,
477490
None => rotor::Response::ok(self)
478491
}
479492
}
493+
ClientFsm::Connecting(..) => unreachable!(),
480494
ClientFsm::Socket(conn) => {
481495
let res = conn.timeout(scope);
482496
let now = scope.now();
@@ -494,13 +508,15 @@ where C: Connect,
494508
let res = conn.wakeup(scope);
495509
let now = scope.now();
496510
scope.conn_response(res, now)
497-
}
511+
},
512+
ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up")
498513
}
499514
}
500515
}
501516

502517
impl<C, H> ClientFsm<C, H>
503518
where C: Connect,
519+
C::Key: fmt::Debug,
504520
C::Output: Transport,
505521
H: Handler<C::Output> {
506522
fn connect(self, scope: &mut rotor::Scope<<Self as rotor::Machine>::Context>) -> rotor::Response<Self, <Self as rotor::Machine>::Seed> {
@@ -509,13 +525,12 @@ where C: Connect,
509525
if let Some((key, res)) = connector.connected() {
510526
match res {
511527
Ok(socket) => {
512-
trace!("connected");
528+
trace!("connecting {:?}", key);
513529
return rotor::Response::spawn(ClientFsm::Connector(connector, rx), (key, socket));
514530
},
515531
Err(e) => {
516-
trace!("connected error = {:?}", e);
517-
let mut queued = scope.pop_queue(&key);
518-
let _ = queued.handler.on_error(::Error::Io(e));
532+
trace!("connect error = {:?}", e);
533+
scope.pop_queue(&key).map(|mut queued| queued.handler.on_error(::Error::Io(e)));
519534
}
520535
}
521536
}

0 commit comments

Comments
 (0)