Skip to content

Commit 68458cd

Browse files
committed
fix(server): Sleep on socket IO errors
1 parent b79f8d3 commit 68458cd

File tree

1 file changed

+61
-4
lines changed

1 file changed

+61
-4
lines changed

src/server/mod.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub struct Http<B = ::Chunk> {
5757
max_buf_size: Option<usize>,
5858
keep_alive: bool,
5959
pipeline: bool,
60+
sleep_on_errors: bool,
6061
_marker: PhantomData<B>,
6162
}
6263

@@ -102,6 +103,9 @@ pub struct AddrIncoming {
102103
addr: SocketAddr,
103104
keep_alive_timeout: Option<Duration>,
104105
listener: TcpListener,
106+
handle: Handle,
107+
sleep_on_errors: bool,
108+
timeout: Option<Timeout>,
105109
}
106110

107111
/// A future binding a connection with a Service.
@@ -144,6 +148,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
144148
keep_alive: true,
145149
max_buf_size: None,
146150
pipeline: false,
151+
sleep_on_errors: false,
147152
_marker: PhantomData,
148153
}
149154
}
@@ -172,6 +177,18 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
172177
self
173178
}
174179

180+
/// Swallow connection accept errors. Instead of passing up IO errors when
181+
/// the server is under heavy load the errors will be ignored. Some
182+
/// connection accept errors (like "connection reset") can be ignored, some
183+
/// (like "too many files open") may consume 100% CPU and a timout of 10ms
184+
/// is used in that case.
185+
///
186+
/// Default is false.
187+
pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self {
188+
self.sleep_on_errors = enabled;
189+
self
190+
}
191+
175192
/// Bind the provided `addr` and return a server ready to handle
176193
/// connections.
177194
///
@@ -225,7 +242,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
225242
Bd: Stream<Item=B, Error=::Error>,
226243
{
227244
let listener = TcpListener::bind(addr, &handle)?;
228-
let mut incoming = AddrIncoming::new(listener)?;
245+
let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?;
229246
if self.keep_alive {
230247
incoming.set_keepalive(Some(Duration::from_secs(90)));
231248
}
@@ -248,6 +265,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
248265
keep_alive: self.keep_alive,
249266
max_buf_size: self.max_buf_size,
250267
pipeline: self.pipeline,
268+
sleep_on_errors: self.sleep_on_errors,
251269
_marker: PhantomData,
252270
},
253271
}
@@ -394,7 +412,7 @@ impl<S, B> Server<S, B>
394412

395413
let handle = reactor.handle();
396414

397-
let mut incoming = AddrIncoming::new(listener)?;
415+
let mut incoming = AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors)?;
398416

399417
if protocol.keep_alive {
400418
incoming.set_keepalive(Some(Duration::from_secs(90)));
@@ -619,11 +637,14 @@ mod unnameable {
619637
// ===== impl AddrIncoming =====
620638

621639
impl AddrIncoming {
622-
fn new(listener: TcpListener) -> io::Result<AddrIncoming> {
640+
fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result<AddrIncoming> {
623641
Ok(AddrIncoming {
624642
addr: listener.local_addr()?,
625643
keep_alive_timeout: None,
626644
listener: listener,
645+
handle: handle,
646+
sleep_on_errors: sleep_on_errors,
647+
timeout: None,
627648
})
628649
}
629650

@@ -643,6 +664,13 @@ impl Stream for AddrIncoming {
643664
type Error = ::std::io::Error;
644665

645666
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
667+
if let Some(ref mut to) = self.timeout {
668+
match to.poll().expect("timeout never fails") {
669+
Async::Ready(_) => {}
670+
Async::NotReady => return Ok(Async::NotReady),
671+
}
672+
}
673+
self.timeout = None;
646674
loop {
647675
match self.listener.accept() {
648676
Ok((socket, addr)) => {
@@ -654,12 +682,41 @@ impl Stream for AddrIncoming {
654682
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
655683
},
656684
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
657-
Err(e) => return Err(e),
685+
Err(ref e) if connection_error(e) => continue,
686+
Err(e) => {
687+
let delay = ::std::time::Duration::from_millis(10);
688+
debug!("Accept error: {}. Sleeping {:?}...",
689+
e, delay);
690+
let mut timeout = Timeout::new(delay, &self.handle)
691+
.expect("can always set a timeout");
692+
let result = timeout.poll()
693+
.expect("timeout never fails");
694+
match result {
695+
Async::Ready(()) => continue,
696+
Async::NotReady => {
697+
self.timeout = Some(timeout);
698+
return Ok(Async::NotReady);
699+
}
700+
}
701+
}
658702
}
659703
}
660704
}
661705
}
662706

707+
/// This function defines errors that are per-connection. Which basically
708+
/// means that if we get this error from `accept()` system call it means
709+
/// next connection might be ready to be accepted.
710+
///
711+
/// All other errors will incur a timeout before next `accept()` is performed.
712+
/// The timeout is useful to handle resource exhaustion errors like ENFILE
713+
/// and EMFILE. Otherwise, could enter into tight loop.
714+
fn connection_error(e: &io::Error) -> bool {
715+
e.kind() == io::ErrorKind::ConnectionRefused ||
716+
e.kind() == io::ErrorKind::ConnectionAborted ||
717+
e.kind() == io::ErrorKind::ConnectionReset
718+
}
719+
663720
mod addr_stream {
664721
use std::io::{self, Read, Write};
665722
use std::net::SocketAddr;

0 commit comments

Comments
 (0)