File tree 6 files changed +51
-8
lines changed
6 files changed +51
-8
lines changed Original file line number Diff line number Diff line change @@ -191,7 +191,7 @@ impl TcpWatcher {
191
191
TcpWatcher {
192
192
home : home,
193
193
handle : handle,
194
- stream : StreamWatcher :: new ( handle) ,
194
+ stream : StreamWatcher :: new ( handle, true ) ,
195
195
refcount : Refcount :: new ( ) ,
196
196
read_access : AccessTimeout :: new ( ) ,
197
197
write_access : AccessTimeout :: new ( ) ,
@@ -278,7 +278,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
278
278
fn clone ( & self ) -> Box < rtio:: RtioTcpStream + Send > {
279
279
box TcpWatcher {
280
280
handle : self . handle ,
281
- stream : StreamWatcher :: new ( self . handle ) ,
281
+ stream : StreamWatcher :: new ( self . handle , false ) ,
282
282
home : self . home . clone ( ) ,
283
283
refcount : self . refcount . clone ( ) ,
284
284
read_access : self . read_access . clone ( ) ,
Original file line number Diff line number Diff line change @@ -67,7 +67,7 @@ impl PipeWatcher {
67
67
handle
68
68
} ;
69
69
PipeWatcher {
70
- stream : StreamWatcher :: new ( handle) ,
70
+ stream : StreamWatcher :: new ( handle, true ) ,
71
71
home : home,
72
72
defused : false ,
73
73
refcount : Refcount :: new ( ) ,
@@ -131,7 +131,7 @@ impl rtio::RtioPipe for PipeWatcher {
131
131
132
132
fn clone ( & self ) -> Box < rtio:: RtioPipe + Send > {
133
133
box PipeWatcher {
134
- stream : StreamWatcher :: new ( self . stream . handle ) ,
134
+ stream : StreamWatcher :: new ( self . stream . handle , false ) ,
135
135
defused : false ,
136
136
home : self . home . clone ( ) ,
137
137
refcount : self . refcount . clone ( ) ,
Original file line number Diff line number Diff line change @@ -59,8 +59,11 @@ impl StreamWatcher {
59
59
// will be manipulated on each of the methods called on this watcher.
60
60
// Wrappers should ensure to always reset the field to an appropriate value
61
61
// if they rely on the field to perform an action.
62
- pub fn new ( stream : * mut uvll:: uv_stream_t ) -> StreamWatcher {
63
- unsafe { uvll:: set_data_for_uv_handle ( stream, 0 as * mut int ) }
62
+ pub fn new ( stream : * mut uvll:: uv_stream_t ,
63
+ init : bool ) -> StreamWatcher {
64
+ if init {
65
+ unsafe { uvll:: set_data_for_uv_handle ( stream, 0 as * mut int ) }
66
+ }
64
67
StreamWatcher {
65
68
handle : stream,
66
69
last_write_req : None ,
Original file line number Diff line number Diff line change @@ -56,7 +56,7 @@ impl TtyWatcher {
56
56
let handle = UvHandle :: alloc ( None :: < TtyWatcher > , uvll:: UV_TTY ) ;
57
57
let mut watcher = TtyWatcher {
58
58
tty : handle,
59
- stream : StreamWatcher :: new ( handle) ,
59
+ stream : StreamWatcher :: new ( handle, true ) ,
60
60
home : io. make_handle ( ) ,
61
61
fd : fd,
62
62
} ;
Original file line number Diff line number Diff line change @@ -1360,4 +1360,44 @@ mod test {
1360
1360
1361
1361
rx2. recv( ) ;
1362
1362
} )
1363
+
1364
+ iotest ! ( fn clone_while_reading( ) {
1365
+ let addr = next_test_ip6( ) ;
1366
+ let listen = TcpListener :: bind( addr. ip. to_str( ) . as_slice( ) , addr. port) ;
1367
+ let mut accept = listen. listen( ) . unwrap( ) ;
1368
+
1369
+ // Enqueue a task to write to a socket
1370
+ let ( tx, rx) = channel( ) ;
1371
+ let ( txdone, rxdone) = channel( ) ;
1372
+ let txdone2 = txdone. clone( ) ;
1373
+ spawn( proc( ) {
1374
+ let mut tcp = TcpStream :: connect( addr. ip. to_str( ) . as_slice( ) ,
1375
+ addr. port) . unwrap( ) ;
1376
+ rx. recv( ) ;
1377
+ tcp. write_u8( 0 ) . unwrap( ) ;
1378
+ txdone2. send( ( ) ) ;
1379
+ } ) ;
1380
+
1381
+ // Spawn off a reading clone
1382
+ let tcp = accept. accept( ) . unwrap( ) ;
1383
+ let tcp2 = tcp. clone( ) ;
1384
+ let txdone3 = txdone. clone( ) ;
1385
+ spawn( proc( ) {
1386
+ let mut tcp2 = tcp2;
1387
+ tcp2. read_u8( ) . unwrap( ) ;
1388
+ txdone3. send( ( ) ) ;
1389
+ } ) ;
1390
+
1391
+ // Try to ensure that the reading clone is indeed reading
1392
+ for _ in range( 0 i, 50 ) {
1393
+ :: task:: deschedule( ) ;
1394
+ }
1395
+
1396
+ // clone the handle again while it's reading, then let it finish the
1397
+ // read.
1398
+ let _ = tcp. clone( ) ;
1399
+ tx. send( ( ) ) ;
1400
+ rxdone. recv( ) ;
1401
+ rxdone. recv( ) ;
1402
+ } )
1363
1403
}
Original file line number Diff line number Diff line change @@ -649,7 +649,7 @@ fn task_abort_no_kill_runtime() {
649
649
use std:: io:: timer;
650
650
use mem;
651
651
652
- let mut tb = TaskBuilder :: new ( ) ;
652
+ let tb = TaskBuilder :: new ( ) ;
653
653
let rx = tb. try_future ( proc ( ) { } ) ;
654
654
mem:: drop ( rx) ;
655
655
timer:: sleep ( 1000 ) ;
You can’t perform that action at this time.
0 commit comments