Skip to content

Commit aec872b

Browse files
AIkorskyThomasdezeeuw
AIkorsky
authored andcommitted
Fix error handling in NamedPipe::write
1 parent 6f86b92 commit aec872b

File tree

2 files changed

+135
-35
lines changed

2 files changed

+135
-35
lines changed

src/sys/windows/named_pipe.rs

Lines changed: 96 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::{poll, Registry};
21
use crate::event::Source;
32
use crate::sys::windows::{Event, Overlapped};
3+
use crate::{poll, Registry};
44
use winapi::um::minwinbase::OVERLAPPED_ENTRY;
55

66
use std::ffi::OsStr;
@@ -9,8 +9,8 @@ use std::io::{self, Read, Write};
99
use std::mem;
1010
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
1111
use std::slice;
12-
use std::sync::atomic::{AtomicUsize, AtomicBool};
1312
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13+
use std::sync::atomic::{AtomicBool, AtomicUsize};
1414
use std::sync::{Arc, Mutex};
1515

1616
use crate::{Interest, Token};
@@ -128,9 +128,7 @@ fn would_block() -> io::Error {
128128
impl NamedPipe {
129129
/// Creates a new named pipe at the specified `addr` given a "reasonable
130130
/// set" of initial configuration options.
131-
pub fn new<A: AsRef<OsStr>>(
132-
addr: A,
133-
) -> io::Result<NamedPipe> {
131+
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
134132
let pipe = pipe::NamedPipe::new(addr)?;
135133
// Safety: nothing actually unsafe about this. The trait fn includes
136134
// `unsafe`.
@@ -226,9 +224,7 @@ impl NamedPipe {
226224
}
227225

228226
impl FromRawHandle for NamedPipe {
229-
unsafe fn from_raw_handle(
230-
handle: RawHandle,
231-
) -> NamedPipe {
227+
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
232228
NamedPipe {
233229
inner: Arc::new(Inner {
234230
// Safety: not really unsafe
@@ -281,9 +277,7 @@ impl<'a> Read for &'a NamedPipe {
281277
match mem::replace(&mut state.read, State::None) {
282278
// In theory not possible with `token` checked above,
283279
// but return would block for now.
284-
State::None => {
285-
Err(would_block())
286-
}
280+
State::None => Err(would_block()),
287281

288282
// A read is in flight, still waiting for it to finish
289283
State::Pending(buf, amt) => {
@@ -324,7 +318,7 @@ impl<'a> Read for &'a NamedPipe {
324318
}
325319

326320
impl<'a> Write for &'a NamedPipe {
327-
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
321+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
328322
// Make sure there's no writes pending
329323
let mut io = self.inner.io.lock().unwrap();
330324

@@ -334,6 +328,12 @@ impl<'a> Write for &'a NamedPipe {
334328

335329
match io.write {
336330
State::None => {}
331+
State::Err(_) => match mem::replace(&mut io.write, State::None) {
332+
State::Err(e) => return Err(e),
333+
// `io` is locked, so this branch is unreachable
334+
_ => unreachable!(),
335+
},
336+
// any other state should be handled in `write_done`
337337
_ => {
338338
return Err(would_block());
339339
}
@@ -342,17 +342,26 @@ impl<'a> Write for &'a NamedPipe {
342342
// Move `buf` onto the heap and fire off the write
343343
let mut owned_buf = self.inner.get_buffer();
344344
owned_buf.extend(buf);
345-
Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None);
346-
Ok(buf.len())
345+
match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
346+
// Some bytes are written immediately
347+
Some(n) => Ok(n),
348+
// Write operation is anqueued for whole buffer
349+
None => Ok(buf.len()),
350+
}
347351
}
348352

349-
fn flush(&mut self) -> io::Result<()> {
350-
Ok(())
353+
fn flush(&mut self) -> io::Result<()> {
354+
Ok(())
351355
}
352356
}
353357

354358
impl Source for NamedPipe {
355-
fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
359+
fn register(
360+
&mut self,
361+
registry: &Registry,
362+
token: Token,
363+
interest: Interest,
364+
) -> io::Result<()> {
356365
let mut io = self.inner.io.lock().unwrap();
357366

358367
io.check_association(registry, false)?;
@@ -368,7 +377,10 @@ impl Source for NamedPipe {
368377
io.cp = Some(poll::selector(registry).clone_port());
369378

370379
let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
371-
poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?;
380+
poll::selector(registry)
381+
.inner
382+
.cp
383+
.add_handle(inner_token, &self.inner.handle)?;
372384
}
373385

374386
io.token = Some(token);
@@ -381,7 +393,12 @@ impl Source for NamedPipe {
381393
Ok(())
382394
}
383395

384-
fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
396+
fn reregister(
397+
&mut self,
398+
registry: &Registry,
399+
token: Token,
400+
interest: Interest,
401+
) -> io::Result<()> {
385402
let mut io = self.inner.io.lock().unwrap();
386403

387404
io.check_association(registry, true)?;
@@ -491,19 +508,61 @@ impl Inner {
491508
}
492509
}
493510

494-
fn schedule_write(me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>) {
511+
/// Maybe schedules overlapped write operation.
512+
///
513+
/// * `None` means that overlapped operation was enqueued
514+
/// * `Some(n)` means that `n` bytes was immediately written.
515+
/// Note, that `write_done` will fire anyway to clean up the state.
516+
fn maybe_schedule_write(
517+
me: &Arc<Inner>,
518+
buf: Vec<u8>,
519+
pos: usize,
520+
io: &mut Io,
521+
) -> io::Result<Option<usize>> {
495522
// Very similar to `schedule_read` above, just done for the write half.
496523
let e = unsafe {
497524
let overlapped = me.write.as_ptr() as *mut _;
498525
me.handle.write_overlapped(&buf[pos..], overlapped)
499526
};
500527

528+
// See `connect` above for the rationale behind `forget`
501529
match e {
502-
// See `connect` above for the rationale behind `forget`
503-
Ok(_) => {
530+
// `n` bytes are written immediately
531+
Ok(Some(n)) => {
532+
io.write = State::Ok(buf, pos);
533+
mem::forget(me.clone());
534+
Ok(Some(n))
535+
}
536+
// write operation is enqueued
537+
Ok(None) => {
504538
io.write = State::Pending(buf, pos);
505-
mem::forget(me.clone())
539+
mem::forget(me.clone());
540+
Ok(None)
506541
}
542+
Err(e) => Err(e),
543+
}
544+
}
545+
546+
fn schedule_write(
547+
me: &Arc<Inner>,
548+
buf: Vec<u8>,
549+
pos: usize,
550+
io: &mut Io,
551+
events: Option<&mut Vec<Event>>,
552+
) {
553+
match Inner::maybe_schedule_write(me, buf, pos, io) {
554+
Ok(Some(_)) => {
555+
// immediate result will be handled in `write_done`,
556+
// so we'll reinterpret the `Ok` state
557+
let state = mem::replace(&mut io.write, State::None);
558+
io.write = match state {
559+
State::Ok(buf, pos) => State::Pending(buf, pos),
560+
// io is locked, so this branch is unreachable
561+
_ => unreachable!(),
562+
};
563+
mem::forget(me.clone());
564+
}
565+
Ok(None) => (),
507566
Err(e) => {
508567
io.write = State::Err(e);
509568
io.notify_writable(events);
@@ -610,6 +669,12 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
610669
// then we're writable again and otherwise we schedule another write.
611670
let mut io = me.io.lock().unwrap();
612671
let (buf, pos) = match mem::replace(&mut io.write, State::None) {
672+
// `Ok` here means, that the operation was completed immediately
673+
// `bytes_transferred` is already reported to a client
674+
State::Ok(..) => {
675+
io.notify_writable(events);
676+
return;
677+
}
613678
State::Pending(buf, pos) => (buf, pos),
614679
_ => unreachable!(),
615680
};
@@ -638,18 +703,14 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
638703
impl Io {
639704
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
640705
match self.cp {
641-
Some(ref cp) if !poll::selector(registry).same_port(cp) => {
642-
Err(io::Error::new(
643-
io::ErrorKind::AlreadyExists,
644-
"I/O source already registered with a different `Registry`"
645-
))
646-
}
647-
None if required => {
648-
Err(io::Error::new(
649-
io::ErrorKind::NotFound,
650-
"I/O source not registered with `Registry`"
651-
))
652-
}
706+
Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
707+
io::ErrorKind::AlreadyExists,
708+
"I/O source already registered with a different `Registry`",
709+
)),
710+
None if required => Err(io::Error::new(
711+
io::ErrorKind::NotFound,
712+
"I/O source not registered with `Registry`",
713+
)),
653714
_ => Ok(()),
654715
}
655716
}

tests/win_named_pipe.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::time::Duration;
99
use mio::windows::NamedPipe;
1010
use mio::{Events, Interest, Poll, Token};
1111
use rand::Rng;
12+
use winapi::shared::winerror::*;
1213
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
1314

1415
fn _assert_kinds() {
@@ -177,6 +178,44 @@ fn connect_after_client() {
177178
}
178179
}
179180

181+
#[test]
182+
fn write_disconnected() {
183+
let mut poll = t!(Poll::new());
184+
let (mut server, mut client) = pipe();
185+
t!(poll.registry().register(
186+
&mut server,
187+
Token(0),
188+
Interest::READABLE | Interest::WRITABLE,
189+
));
190+
t!(poll.registry().register(
191+
&mut client,
192+
Token(1),
193+
Interest::READABLE | Interest::WRITABLE,
194+
));
195+
196+
drop(client);
197+
198+
let mut events = Events::with_capacity(128);
199+
t!(poll.poll(&mut events, None));
200+
assert!(events.iter().count() > 0);
201+
202+
// this should not hang
203+
let mut i = 0;
204+
loop {
205+
i += 1;
206+
assert!(i < 16, "too many iterations");
207+
208+
match server.write(&[0]) {
209+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
210+
t!(poll.poll(&mut events, None));
211+
assert!(events.iter().count() > 0);
212+
}
213+
Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => break,
214+
e => panic!("{:?}", e),
215+
}
216+
}
217+
}
218+
180219
#[test]
181220
fn write_then_drop() {
182221
let (mut server, mut client) = pipe();

0 commit comments

Comments
 (0)