Skip to content

Commit 6328f7c

Browse files
committed
std: Add timeouts to unix connect/accept
This adds support for connecting to a unix socket with a timeout (a named pipe on windows), and accepting a connection with a timeout. The goal is to bring unix pipes/named sockets back in line with TCP support for timeouts. Similarly to the TCP sockets, all methods are marked #[experimental] due to uncertainty about the type of the timeout argument. This internally involved a good bit of refactoring to share as much code as possible between TCP servers and pipe servers, but the core implementation did not change drastically as part of this commit. cc #13523
1 parent 67ee480 commit 6328f7c

File tree

12 files changed

+531
-349
lines changed

12 files changed

+531
-349
lines changed

src/liblibc/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ pub use funcs::bsd43::{shutdown};
225225
#[cfg(windows)] pub use consts::os::extra::{PIPE_UNLIMITED_INSTANCES, ERROR_ACCESS_DENIED};
226226
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
227227
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
228-
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED};
228+
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
229229
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
230230
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
231231
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};

src/libnative/io/c_win32.rs

+2
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,6 @@ extern "system" {
5959
optname: libc::c_int,
6060
optval: *mut libc::c_char,
6161
optlen: *mut libc::c_int) -> libc::c_int;
62+
63+
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
6264
}

src/libnative/io/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub use self::process::Process;
4444
pub mod addrinfo;
4545
pub mod net;
4646
pub mod process;
47+
mod util;
4748

4849
#[cfg(unix)]
4950
#[path = "file_unix.rs"]
@@ -177,8 +178,9 @@ impl rtio::IoFactory for IoFactory {
177178
fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener:Send> {
178179
pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener:Send)
179180
}
180-
fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe:Send> {
181-
pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe:Send)
181+
fn unix_connect(&mut self, path: &CString,
182+
timeout: Option<u64>) -> IoResult<~RtioPipe:Send> {
183+
pipe::UnixStream::connect(path, timeout).map(|s| ~s as ~RtioPipe:Send)
182184
}
183185
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
184186
hint: Option<ai::Hint>) -> IoResult<~[ai::Info]> {

src/libnative/io/net.rs

+6-122
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@ use std::cast;
1313
use std::io::net::ip;
1414
use std::io;
1515
use std::mem;
16-
use std::os;
17-
use std::ptr;
1816
use std::rt::rtio;
1917
use std::sync::arc::UnsafeArc;
2018

2119
use super::{IoResult, retry, keep_going};
2220
use super::c;
21+
use super::util;
2322

2423
////////////////////////////////////////////////////////////////////////////////
2524
// sockaddr and misc bindings
@@ -118,8 +117,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
118117
}
119118
}
120119

121-
fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
122-
val: libc::c_int) -> IoResult<T> {
120+
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
121+
val: libc::c_int) -> IoResult<T> {
123122
unsafe {
124123
let mut slot: T = mem::init();
125124
let mut len = mem::size_of::<T>() as libc::socklen_t;
@@ -145,21 +144,6 @@ fn last_error() -> io::IoError {
145144
super::last_error()
146145
}
147146

148-
fn ms_to_timeval(ms: u64) -> libc::timeval {
149-
libc::timeval {
150-
tv_sec: (ms / 1000) as libc::time_t,
151-
tv_usec: ((ms % 1000) * 1000) as libc::suseconds_t,
152-
}
153-
}
154-
155-
fn timeout(desc: &'static str) -> io::IoError {
156-
io::IoError {
157-
kind: io::TimedOut,
158-
desc: desc,
159-
detail: None,
160-
}
161-
}
162-
163147
#[cfg(windows)] unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
164148
#[cfg(unix)] unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
165149

@@ -270,7 +254,7 @@ impl TcpStream {
270254
let addrp = &addr as *_ as *libc::sockaddr;
271255
match timeout {
272256
Some(timeout) => {
273-
try!(TcpStream::connect_timeout(fd, addrp, len, timeout));
257+
try!(util::connect_timeout(fd, addrp, len, timeout));
274258
Ok(ret)
275259
},
276260
None => {
@@ -282,84 +266,6 @@ impl TcpStream {
282266
}
283267
}
284268

285-
// See http://developerweb.net/viewtopic.php?id=3196 for where this is
286-
// derived from.
287-
fn connect_timeout(fd: sock_t,
288-
addrp: *libc::sockaddr,
289-
len: libc::socklen_t,
290-
timeout_ms: u64) -> IoResult<()> {
291-
#[cfg(unix)] use INPROGRESS = libc::EINPROGRESS;
292-
#[cfg(windows)] use INPROGRESS = libc::WSAEINPROGRESS;
293-
#[cfg(unix)] use WOULDBLOCK = libc::EWOULDBLOCK;
294-
#[cfg(windows)] use WOULDBLOCK = libc::WSAEWOULDBLOCK;
295-
296-
// Make sure the call to connect() doesn't block
297-
try!(set_nonblocking(fd, true));
298-
299-
let ret = match unsafe { libc::connect(fd, addrp, len) } {
300-
// If the connection is in progress, then we need to wait for it to
301-
// finish (with a timeout). The current strategy for doing this is
302-
// to use select() with a timeout.
303-
-1 if os::errno() as int == INPROGRESS as int ||
304-
os::errno() as int == WOULDBLOCK as int => {
305-
let mut set: c::fd_set = unsafe { mem::init() };
306-
c::fd_set(&mut set, fd);
307-
match await(fd, &mut set, timeout_ms) {
308-
0 => Err(timeout("connection timed out")),
309-
-1 => Err(last_error()),
310-
_ => {
311-
let err: libc::c_int = try!(
312-
getsockopt(fd, libc::SOL_SOCKET, libc::SO_ERROR));
313-
if err == 0 {
314-
Ok(())
315-
} else {
316-
Err(io::IoError::from_errno(err as uint, true))
317-
}
318-
}
319-
}
320-
}
321-
322-
-1 => Err(last_error()),
323-
_ => Ok(()),
324-
};
325-
326-
// be sure to turn blocking I/O back on
327-
try!(set_nonblocking(fd, false));
328-
return ret;
329-
330-
#[cfg(unix)]
331-
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
332-
let set = nb as libc::c_int;
333-
super::mkerr_libc(retry(|| unsafe { c::ioctl(fd, c::FIONBIO, &set) }))
334-
}
335-
#[cfg(windows)]
336-
fn set_nonblocking(fd: sock_t, nb: bool) -> IoResult<()> {
337-
let mut set = nb as libc::c_ulong;
338-
if unsafe { c::ioctlsocket(fd, c::FIONBIO, &mut set) != 0 } {
339-
Err(last_error())
340-
} else {
341-
Ok(())
342-
}
343-
}
344-
345-
#[cfg(unix)]
346-
fn await(fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
347-
let start = ::io::timer::now();
348-
retry(|| unsafe {
349-
// Recalculate the timeout each iteration (it is generally
350-
// undefined what the value of the 'tv' is after select
351-
// returns EINTR).
352-
let tv = ms_to_timeval(timeout - (::io::timer::now() - start));
353-
c::select(fd + 1, ptr::null(), &*set, ptr::null(), &tv)
354-
})
355-
}
356-
#[cfg(windows)]
357-
fn await(_fd: sock_t, set: &mut c::fd_set, timeout: u64) -> libc::c_int {
358-
let tv = ms_to_timeval(timeout);
359-
unsafe { c::select(1, ptr::null(), &*set, ptr::null(), &tv) }
360-
}
361-
}
362-
363269
pub fn fd(&self) -> sock_t {
364270
// This unsafety is fine because it's just a read-only arc
365271
unsafe { (*self.inner.get()).fd }
@@ -533,7 +439,7 @@ impl TcpAcceptor {
533439

534440
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
535441
if self.deadline != 0 {
536-
try!(self.accept_deadline());
442+
try!(util::accept_deadline(self.fd(), self.deadline));
537443
}
538444
unsafe {
539445
let mut storage: libc::sockaddr_storage = mem::init();
@@ -550,25 +456,6 @@ impl TcpAcceptor {
550456
}
551457
}
552458
}
553-
554-
fn accept_deadline(&mut self) -> IoResult<()> {
555-
let mut set: c::fd_set = unsafe { mem::init() };
556-
c::fd_set(&mut set, self.fd());
557-
558-
match retry(|| {
559-
// If we're past the deadline, then pass a 0 timeout to select() so
560-
// we can poll the status of the socket.
561-
let now = ::io::timer::now();
562-
let ms = if self.deadline > now {0} else {self.deadline - now};
563-
let tv = ms_to_timeval(ms);
564-
let n = if cfg!(windows) {1} else {self.fd() as libc::c_int + 1};
565-
unsafe { c::select(n, &set, ptr::null(), ptr::null(), &tv) }
566-
}) {
567-
-1 => Err(last_error()),
568-
0 => Err(timeout("accept timed out")),
569-
_ => return Ok(()),
570-
}
571-
}
572459
}
573460

574461
impl rtio::RtioSocket for TcpAcceptor {
@@ -585,10 +472,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
585472
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
586473
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
587474
fn set_timeout(&mut self, timeout: Option<u64>) {
588-
self.deadline = match timeout {
589-
None => 0,
590-
Some(t) => ::io::timer::now() + t,
591-
};
475+
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
592476
}
593477
}
594478

src/libnative/io/pipe_unix.rs

+30-29
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11+
use libc;
1112
use std::c_str::CString;
1213
use std::cast;
14+
use std::intrinsics;
1315
use std::io;
14-
use libc;
1516
use std::mem;
1617
use std::rt::rtio;
1718
use std::sync::arc::UnsafeArc;
18-
use std::intrinsics;
1919

2020
use super::{IoResult, retry, keep_going};
21+
use super::util;
2122
use super::file::fd_t;
2223

2324
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
@@ -52,22 +53,6 @@ fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint
5253
return Ok((storage, len));
5354
}
5455

55-
fn sockaddr_to_unix(storage: &libc::sockaddr_storage,
56-
len: uint) -> IoResult<CString> {
57-
match storage.ss_family as libc::c_int {
58-
libc::AF_UNIX => {
59-
assert!(len as uint <= mem::size_of::<libc::sockaddr_un>());
60-
let storage: &libc::sockaddr_un = unsafe {
61-
cast::transmute(storage)
62-
};
63-
unsafe {
64-
Ok(CString::new(storage.sun_path.as_ptr(), false).clone())
65-
}
66-
}
67-
_ => Err(io::standard_error(io::InvalidInput))
68-
}
69-
}
70-
7156
struct Inner {
7257
fd: fd_t,
7358
}
@@ -76,16 +61,24 @@ impl Drop for Inner {
7661
fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } }
7762
}
7863

79-
fn connect(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
64+
fn connect(addr: &CString, ty: libc::c_int,
65+
timeout: Option<u64>) -> IoResult<Inner> {
8066
let (addr, len) = try!(addr_to_sockaddr_un(addr));
8167
let inner = Inner { fd: try!(unix_socket(ty)) };
82-
let addrp = &addr as *libc::sockaddr_storage;
83-
match retry(|| unsafe {
84-
libc::connect(inner.fd, addrp as *libc::sockaddr,
85-
len as libc::socklen_t)
86-
}) {
87-
-1 => Err(super::last_error()),
88-
_ => Ok(inner)
68+
let addrp = &addr as *_ as *libc::sockaddr;
69+
let len = len as libc::socklen_t;
70+
71+
match timeout {
72+
None => {
73+
match retry(|| unsafe { libc::connect(inner.fd, addrp, len) }) {
74+
-1 => Err(super::last_error()),
75+
_ => Ok(inner)
76+
}
77+
}
78+
Some(timeout_ms) => {
79+
try!(util::connect_timeout(inner.fd, addrp, len, timeout_ms));
80+
Ok(inner)
81+
}
8982
}
9083
}
9184

@@ -110,8 +103,9 @@ pub struct UnixStream {
110103
}
111104

112105
impl UnixStream {
113-
pub fn connect(addr: &CString) -> IoResult<UnixStream> {
114-
connect(addr, libc::SOCK_STREAM).map(|inner| {
106+
pub fn connect(addr: &CString,
107+
timeout: Option<u64>) -> IoResult<UnixStream> {
108+
connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
115109
UnixStream { inner: UnsafeArc::new(inner) }
116110
})
117111
}
@@ -176,7 +170,7 @@ impl UnixListener {
176170
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
177171
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
178172
-1 => Err(super::last_error()),
179-
_ => Ok(UnixAcceptor { listener: self })
173+
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
180174
}
181175
}
182176
}
@@ -189,12 +183,16 @@ impl rtio::RtioUnixListener for UnixListener {
189183

190184
pub struct UnixAcceptor {
191185
listener: UnixListener,
186+
deadline: u64,
192187
}
193188

194189
impl UnixAcceptor {
195190
fn fd(&self) -> fd_t { self.listener.fd() }
196191

197192
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
193+
if self.deadline != 0 {
194+
try!(util::accept_deadline(self.fd(), self.deadline));
195+
}
198196
let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() };
199197
let storagep = &mut storage as *mut libc::sockaddr_storage;
200198
let size = mem::size_of::<libc::sockaddr_storage>();
@@ -214,6 +212,9 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
214212
fn accept(&mut self) -> IoResult<~rtio::RtioPipe:Send> {
215213
self.native_accept().map(|s| ~s as ~rtio::RtioPipe:Send)
216214
}
215+
fn set_timeout(&mut self, timeout: Option<u64>) {
216+
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
217+
}
217218
}
218219

219220
impl Drop for UnixListener {

0 commit comments

Comments
 (0)