1
- use crate :: { poll, Registry } ;
2
1
use crate :: event:: Source ;
3
2
use crate :: sys:: windows:: { Event , Overlapped } ;
3
+ use crate :: { poll, Registry } ;
4
4
use winapi:: um:: minwinbase:: OVERLAPPED_ENTRY ;
5
5
6
6
use std:: ffi:: OsStr ;
@@ -9,8 +9,8 @@ use std::io::{self, Read, Write};
9
9
use std:: mem;
10
10
use std:: os:: windows:: io:: { AsRawHandle , FromRawHandle , IntoRawHandle , RawHandle } ;
11
11
use std:: slice;
12
- use std:: sync:: atomic:: { AtomicUsize , AtomicBool } ;
13
12
use std:: sync:: atomic:: Ordering :: { Relaxed , SeqCst } ;
13
+ use std:: sync:: atomic:: { AtomicBool , AtomicUsize } ;
14
14
use std:: sync:: { Arc , Mutex } ;
15
15
16
16
use crate :: { Interest , Token } ;
@@ -128,9 +128,7 @@ fn would_block() -> io::Error {
128
128
impl NamedPipe {
129
129
/// Creates a new named pipe at the specified `addr` given a "reasonable
130
130
/// set" of initial configuration options.
131
- pub fn new < A : AsRef < OsStr > > (
132
- addr : A ,
133
- ) -> io:: Result < NamedPipe > {
131
+ pub fn new < A : AsRef < OsStr > > ( addr : A ) -> io:: Result < NamedPipe > {
134
132
let pipe = pipe:: NamedPipe :: new ( addr) ?;
135
133
// Safety: nothing actually unsafe about this. The trait fn includes
136
134
// `unsafe`.
@@ -226,9 +224,7 @@ impl NamedPipe {
226
224
}
227
225
228
226
impl FromRawHandle for NamedPipe {
229
- unsafe fn from_raw_handle (
230
- handle : RawHandle ,
231
- ) -> NamedPipe {
227
+ unsafe fn from_raw_handle ( handle : RawHandle ) -> NamedPipe {
232
228
NamedPipe {
233
229
inner : Arc :: new ( Inner {
234
230
// Safety: not really unsafe
@@ -281,9 +277,7 @@ impl<'a> Read for &'a NamedPipe {
281
277
match mem:: replace ( & mut state. read , State :: None ) {
282
278
// In theory not possible with `token` checked above,
283
279
// but return would block for now.
284
- State :: None => {
285
- Err ( would_block ( ) )
286
- }
280
+ State :: None => Err ( would_block ( ) ) ,
287
281
288
282
// A read is in flight, still waiting for it to finish
289
283
State :: Pending ( buf, amt) => {
@@ -324,7 +318,7 @@ impl<'a> Read for &'a NamedPipe {
324
318
}
325
319
326
320
impl < ' a > Write for & ' a NamedPipe {
327
- fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
321
+ fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
328
322
// Make sure there's no writes pending
329
323
let mut io = self . inner . io . lock ( ) . unwrap ( ) ;
330
324
@@ -334,6 +328,12 @@ impl<'a> Write for &'a NamedPipe {
334
328
335
329
match io. write {
336
330
State :: None => { }
331
+ State :: Err ( _) => match mem:: replace ( & mut io. write , State :: None ) {
332
+ State :: Err ( e) => return Err ( e) ,
333
+ // `io` is locked, so this branch is unreachable
334
+ _ => unreachable ! ( ) ,
335
+ } ,
336
+ // any other state should be handled in `write_done`
337
337
_ => {
338
338
return Err ( would_block ( ) ) ;
339
339
}
@@ -342,17 +342,26 @@ impl<'a> Write for &'a NamedPipe {
342
342
// Move `buf` onto the heap and fire off the write
343
343
let mut owned_buf = self . inner . get_buffer ( ) ;
344
344
owned_buf. extend ( buf) ;
345
- Inner :: schedule_write ( & self . inner , owned_buf, 0 , & mut io, None ) ;
346
- Ok ( buf. len ( ) )
345
+ match Inner :: maybe_schedule_write ( & self . inner , owned_buf, 0 , & mut io) ? {
346
+ // Some bytes are written immediately
347
+ Some ( n) => Ok ( n) ,
348
+ // Write operation is anqueued for whole buffer
349
+ None => Ok ( buf. len ( ) ) ,
350
+ }
347
351
}
348
352
349
- fn flush ( & mut self ) -> io:: Result < ( ) > {
350
- Ok ( ( ) )
353
+ fn flush ( & mut self ) -> io:: Result < ( ) > {
354
+ Ok ( ( ) )
351
355
}
352
356
}
353
357
354
358
impl Source for NamedPipe {
355
- fn register ( & mut self , registry : & Registry , token : Token , interest : Interest ) -> io:: Result < ( ) > {
359
+ fn register (
360
+ & mut self ,
361
+ registry : & Registry ,
362
+ token : Token ,
363
+ interest : Interest ,
364
+ ) -> io:: Result < ( ) > {
356
365
let mut io = self . inner . io . lock ( ) . unwrap ( ) ;
357
366
358
367
io. check_association ( registry, false ) ?;
@@ -368,7 +377,10 @@ impl Source for NamedPipe {
368
377
io. cp = Some ( poll:: selector ( registry) . clone_port ( ) ) ;
369
378
370
379
let inner_token = NEXT_TOKEN . fetch_add ( 2 , Relaxed ) + 2 ;
371
- poll:: selector ( registry) . inner . cp . add_handle ( inner_token, & self . inner . handle ) ?;
380
+ poll:: selector ( registry)
381
+ . inner
382
+ . cp
383
+ . add_handle ( inner_token, & self . inner . handle ) ?;
372
384
}
373
385
374
386
io. token = Some ( token) ;
@@ -381,7 +393,12 @@ impl Source for NamedPipe {
381
393
Ok ( ( ) )
382
394
}
383
395
384
- fn reregister ( & mut self , registry : & Registry , token : Token , interest : Interest ) -> io:: Result < ( ) > {
396
+ fn reregister (
397
+ & mut self ,
398
+ registry : & Registry ,
399
+ token : Token ,
400
+ interest : Interest ,
401
+ ) -> io:: Result < ( ) > {
385
402
let mut io = self . inner . io . lock ( ) . unwrap ( ) ;
386
403
387
404
io. check_association ( registry, true ) ?;
@@ -491,19 +508,61 @@ impl Inner {
491
508
}
492
509
}
493
510
494
- fn schedule_write ( me : & Arc < Inner > , buf : Vec < u8 > , pos : usize , io : & mut Io , events : Option < & mut Vec < Event > > ) {
511
+ /// Maybe schedules overlapped write operation.
512
+ ///
513
+ /// * `None` means that overlapped operation was enqueued
514
+ /// * `Some(n)` means that `n` bytes was immediately written.
515
+ /// Note, that `write_done` will fire anyway to clean up the state.
516
+ fn maybe_schedule_write (
517
+ me : & Arc < Inner > ,
518
+ buf : Vec < u8 > ,
519
+ pos : usize ,
520
+ io : & mut Io ,
521
+ ) -> io:: Result < Option < usize > > {
495
522
// Very similar to `schedule_read` above, just done for the write half.
496
523
let e = unsafe {
497
524
let overlapped = me. write . as_ptr ( ) as * mut _ ;
498
525
me. handle . write_overlapped ( & buf[ pos..] , overlapped)
499
526
} ;
500
527
528
+ // See `connect` above for the rationale behind `forget`
501
529
match e {
502
- // See `connect` above for the rationale behind `forget`
503
- Ok ( _) => {
530
+ // `n` bytes are written immediately
531
+ Ok ( Some ( n) ) => {
532
+ io. write = State :: Ok ( buf, pos) ;
533
+ mem:: forget ( me. clone ( ) ) ;
534
+ Ok ( Some ( n) )
535
+ }
536
+ // write operation is enqueued
537
+ Ok ( None ) => {
504
538
io. write = State :: Pending ( buf, pos) ;
505
- mem:: forget ( me. clone ( ) )
539
+ mem:: forget ( me. clone ( ) ) ;
540
+ Ok ( None )
506
541
}
542
+ Err ( e) => Err ( e) ,
543
+ }
544
+ }
545
+
546
+ fn schedule_write (
547
+ me : & Arc < Inner > ,
548
+ buf : Vec < u8 > ,
549
+ pos : usize ,
550
+ io : & mut Io ,
551
+ events : Option < & mut Vec < Event > > ,
552
+ ) {
553
+ match Inner :: maybe_schedule_write ( me, buf, pos, io) {
554
+ Ok ( Some ( _) ) => {
555
+ // immediate result will be handled in `write_done`,
556
+ // so we'll reinterpret the `Ok` state
557
+ let state = mem:: replace ( & mut io. write , State :: None ) ;
558
+ io. write = match state {
559
+ State :: Ok ( buf, pos) => State :: Pending ( buf, pos) ,
560
+ // io is locked, so this branch is unreachable
561
+ _ => unreachable ! ( ) ,
562
+ } ;
563
+ mem:: forget ( me. clone ( ) ) ;
564
+ }
565
+ Ok ( None ) => ( ) ,
507
566
Err ( e) => {
508
567
io. write = State :: Err ( e) ;
509
568
io. notify_writable ( events) ;
@@ -610,6 +669,12 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
610
669
// then we're writable again and otherwise we schedule another write.
611
670
let mut io = me. io . lock ( ) . unwrap ( ) ;
612
671
let ( buf, pos) = match mem:: replace ( & mut io. write , State :: None ) {
672
+ // `Ok` here means, that the operation was completed immediately
673
+ // `bytes_transferred` is already reported to a client
674
+ State :: Ok ( ..) => {
675
+ io. notify_writable ( events) ;
676
+ return ;
677
+ }
613
678
State :: Pending ( buf, pos) => ( buf, pos) ,
614
679
_ => unreachable ! ( ) ,
615
680
} ;
@@ -638,18 +703,14 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
638
703
impl Io {
639
704
fn check_association ( & self , registry : & Registry , required : bool ) -> io:: Result < ( ) > {
640
705
match self . cp {
641
- Some ( ref cp) if !poll:: selector ( registry) . same_port ( cp) => {
642
- Err ( io:: Error :: new (
643
- io:: ErrorKind :: AlreadyExists ,
644
- "I/O source already registered with a different `Registry`"
645
- ) )
646
- }
647
- None if required => {
648
- Err ( io:: Error :: new (
649
- io:: ErrorKind :: NotFound ,
650
- "I/O source not registered with `Registry`"
651
- ) )
652
- }
706
+ Some ( ref cp) if !poll:: selector ( registry) . same_port ( cp) => Err ( io:: Error :: new (
707
+ io:: ErrorKind :: AlreadyExists ,
708
+ "I/O source already registered with a different `Registry`" ,
709
+ ) ) ,
710
+ None if required => Err ( io:: Error :: new (
711
+ io:: ErrorKind :: NotFound ,
712
+ "I/O source not registered with `Registry`" ,
713
+ ) ) ,
653
714
_ => Ok ( ( ) ) ,
654
715
}
655
716
}
0 commit comments