@@ -25,6 +25,7 @@ use rt::io::{standard_error, OtherIoError};
25
25
use rt:: tube:: Tube ;
26
26
use rt:: local:: Local ;
27
27
use unstable:: sync:: { Exclusive , exclusive} ;
28
+ use rt:: uv:: net:: uv_ip4_to_ip4;
28
29
29
30
#[ cfg( test) ] use container:: Container ;
30
31
#[ cfg( test) ] use uint;
@@ -260,6 +261,24 @@ impl IoFactory for UvIoFactory {
260
261
}
261
262
}
262
263
}
264
+
265
+ fn udp_bind ( & mut self , addr : IpAddr ) -> Result < ~RtioUdpSocketObject , IoError > {
266
+ let mut watcher = UdpWatcher :: new ( self . uv_loop ( ) ) ;
267
+ match watcher. bind ( addr) {
268
+ Ok ( _) => Ok ( ~UvUdpSocket { watcher : watcher } ) ,
269
+ Err ( uverr) => {
270
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
271
+ do scheduler. deschedule_running_task_and_then |_, task| {
272
+ let task_cell = Cell :: new ( task) ;
273
+ do watcher. close {
274
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
275
+ scheduler. resume_task_immediately ( task_cell. take ( ) ) ;
276
+ }
277
+ }
278
+ Err ( uv_error_to_io_error ( uverr) )
279
+ }
280
+ }
281
+ }
263
282
}
264
283
265
284
// FIXME #6090: Prefer newtype structs but Drop doesn't work
@@ -358,7 +377,7 @@ impl Drop for UvTcpStream {
358
377
}
359
378
360
379
impl RtioTcpStream for UvTcpStream {
361
- fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < uint , IoError > {
380
+ fn read ( & self , buf : & mut [ u8 ] ) -> Result < uint , IoError > {
362
381
let result_cell = Cell :: new_empty ( ) ;
363
382
let result_cell_ptr: * Cell < Result < uint , IoError > > = & result_cell;
364
383
@@ -403,7 +422,7 @@ impl RtioTcpStream for UvTcpStream {
403
422
return result_cell. take ( ) ;
404
423
}
405
424
406
- fn write ( & mut self , buf : & [ u8 ] ) -> Result < ( ) , IoError > {
425
+ fn write ( & self , buf : & [ u8 ] ) -> Result < ( ) , IoError > {
407
426
let result_cell = Cell :: new_empty ( ) ;
408
427
let result_cell_ptr: * Cell < Result < ( ) , IoError > > = & result_cell;
409
428
let scheduler = Local :: take :: < Scheduler > ( ) ;
@@ -433,23 +452,21 @@ impl RtioTcpStream for UvTcpStream {
433
452
}
434
453
}
435
454
436
- pub struct UvUdpStream {
437
- watcher : UdpWatcher ,
438
- address : IpAddr
455
+ pub struct UvUdpSocket {
456
+ watcher : UdpWatcher
439
457
}
440
458
441
- impl UvUdpStream {
459
+ impl UvUdpSocket {
442
460
fn watcher ( & self ) -> UdpWatcher { self . watcher }
443
- fn address ( & self ) -> IpAddr { self . address }
444
461
}
445
462
446
- impl Drop for UvUdpStream {
463
+ impl Drop for UvUdpSocket {
447
464
fn finalize ( & self ) {
448
- rtdebug ! ( "closing udp stream " ) ;
465
+ rtdebug ! ( "closing udp socket " ) ;
449
466
let watcher = self . watcher ( ) ;
450
467
let scheduler = Local :: take :: < Scheduler > ( ) ;
451
468
do scheduler. deschedule_running_task_and_then |_, task| {
452
- let task_cell = Cell ( task) ;
469
+ let task_cell = Cell :: new ( task) ;
453
470
do watcher. close {
454
471
let scheduler = Local :: take :: < Scheduler > ( ) ;
455
472
scheduler. resume_task_immediately ( task_cell. take ( ) ) ;
@@ -458,40 +475,31 @@ impl Drop for UvUdpStream {
458
475
}
459
476
}
460
477
461
- impl RtioUdpStream for UvUdpStream {
462
- fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < uint , IoError > {
463
- let result_cell = empty_cell ( ) ;
464
- let result_cell_ptr: * Cell < Result < uint , IoError > > = & result_cell;
478
+ impl RtioUdpSocket for UvUdpSocket {
479
+ fn recvfrom ( & self , buf : & mut [ u8 ] ) -> Result < ( uint , IpAddr ) , IoError > {
480
+ let result_cell = Cell :: new_empty ( ) ;
481
+ let result_cell_ptr: * Cell < Result < ( uint , IpAddr ) , IoError > > = & result_cell;
465
482
466
483
let scheduler = Local :: take :: < Scheduler > ( ) ;
467
484
assert ! ( scheduler. in_task_context( ) ) ;
468
485
let watcher = self . watcher ( ) ;
469
- let connection_address = self . address ( ) ;
470
486
let buf_ptr: * & mut [ u8 ] = & buf;
471
487
do scheduler. deschedule_running_task_and_then |sched, task| {
472
- rtdebug ! ( "read : entered scheduler context" ) ;
488
+ rtdebug ! ( "recvfrom : entered scheduler context" ) ;
473
489
assert ! ( !sched. in_task_context( ) ) ;
474
490
let mut watcher = watcher;
475
- let task_cell = Cell ( task) ;
476
- // XXX: see note in RtioTcpStream implementation for UvTcpStream
477
- let alloc: AllocCallback = |_| unsafe {
478
- slice_to_uv_buf ( * buf_ptr)
479
- } ;
480
- do watcher. recv_start ( alloc) |watcher, nread, _buf, addr, flags, status| {
481
- let _ = flags; // TODO actually use flags
491
+ let task_cell = Cell :: new ( task) ;
492
+ let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf ( * buf_ptr) } ;
493
+ do watcher. recv_start ( alloc) |watcher, nread, buf, addr, flags, status| {
494
+ let _ = flags; // TODO
495
+ let _ = buf; // TODO
482
496
483
- // XXX: see note in RtioTcpStream implementation for UvTcpStream
484
497
let mut watcher = watcher;
485
498
watcher. recv_stop ( ) ;
486
499
487
- let incoming_address = net:: uv_ip4_to_ip4 ( & addr) ;
488
500
let result = if status. is_none ( ) {
489
501
assert ! ( nread >= 0 ) ;
490
- if incoming_address != connection_address {
491
- Ok ( 0 u)
492
- } else {
493
- Ok ( nread as uint )
494
- }
502
+ Ok ( ( nread as uint , uv_ip4_to_ip4 ( & addr) ) )
495
503
} else {
496
504
Err ( uv_error_to_io_error ( status. unwrap ( ) ) )
497
505
} ;
@@ -505,11 +513,37 @@ impl RtioUdpStream for UvUdpStream {
505
513
506
514
assert!( !result_cell. is_empty ( ) ) ;
507
515
return result_cell. take ( ) ;
516
+
508
517
}
518
+ fn sendto ( & self , buf : & [ u8 ] , dst : IpAddr ) -> Result < ( ) , IoError > {
519
+ let result_cell = Cell :: new_empty ( ) ;
520
+ let result_cell_ptr: * Cell < Result < ( ) , IoError > > = & result_cell;
521
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
522
+ assert ! ( scheduler. in_task_context( ) ) ;
523
+ let watcher = self . watcher ( ) ;
524
+ let buf_ptr: * & [ u8 ] = & buf;
525
+ do scheduler. deschedule_running_task_and_then |_, task| {
526
+ let mut watcher = watcher;
527
+ let task_cell = Cell :: new ( task) ;
528
+ let buf = unsafe { slice_to_uv_buf ( * buf_ptr) } ;
529
+ do watcher. send ( buf, dst) |watcher, status| {
530
+ let _ = watcher; // TODO
531
+
532
+ let result = if status. is_none ( ) {
533
+ Ok ( ( ) )
534
+ } else {
535
+ Err ( uv_error_to_io_error ( status. unwrap ( ) ) )
536
+ } ;
537
+
538
+ unsafe { ( * result_cell_ptr) . put_back ( result) ; }
539
+
540
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
541
+ scheduler. resume_task_immediately ( task_cell. take ( ) ) ;
542
+ }
543
+ }
509
544
510
- fn write ( & mut self , buf : & [ u8 ] ) -> Result < ( ) , IoError > {
511
- let _ = buf;
512
- fail ! ( )
545
+ assert!( !result_cell. is_empty ( ) ) ;
546
+ return result_cell. take ( ) ;
513
547
}
514
548
}
515
549
@@ -535,7 +569,7 @@ fn test_simple_tcp_server_and_client() {
535
569
unsafe {
536
570
let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
537
571
let mut listener = ( * io) . tcp_bind ( addr) . unwrap ( ) ;
538
- let mut stream = listener. accept ( ) . unwrap ( ) ;
572
+ let stream = listener. accept ( ) . unwrap ( ) ;
539
573
let mut buf = [ 0 , .. 2048 ] ;
540
574
let nread = stream. read ( buf) . unwrap ( ) ;
541
575
assert_eq ! ( nread, 8 ) ;
@@ -549,7 +583,7 @@ fn test_simple_tcp_server_and_client() {
549
583
do spawntask_immediately {
550
584
unsafe {
551
585
let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
552
- let mut stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
586
+ let stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
553
587
stream. write ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] ) ;
554
588
}
555
589
}
@@ -564,7 +598,7 @@ fn test_read_and_block() {
564
598
do spawntask_immediately {
565
599
let io = unsafe { Local :: unsafe_borrow :: < IoFactoryObject > ( ) } ;
566
600
let mut listener = unsafe { ( * io) . tcp_bind ( addr) . unwrap ( ) } ;
567
- let mut stream = listener. accept ( ) . unwrap ( ) ;
601
+ let stream = listener. accept ( ) . unwrap ( ) ;
568
602
let mut buf = [ 0 , .. 2048 ] ;
569
603
570
604
let expected = 32 ;
@@ -597,7 +631,7 @@ fn test_read_and_block() {
597
631
do spawntask_immediately {
598
632
unsafe {
599
633
let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
600
- let mut stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
634
+ let stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
601
635
stream. write ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] ) ;
602
636
stream. write ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] ) ;
603
637
stream. write ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] ) ;
@@ -618,7 +652,7 @@ fn test_read_read_read() {
618
652
unsafe {
619
653
let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
620
654
let mut listener = ( * io) . tcp_bind ( addr) . unwrap ( ) ;
621
- let mut stream = listener. accept ( ) . unwrap ( ) ;
655
+ let stream = listener. accept ( ) . unwrap ( ) ;
622
656
let buf = [ 1 , .. 2048 ] ;
623
657
let mut total_bytes_written = 0 ;
624
658
while total_bytes_written < MAX {
@@ -631,7 +665,7 @@ fn test_read_read_read() {
631
665
do spawntask_immediately {
632
666
unsafe {
633
667
let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
634
- let mut stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
668
+ let stream = ( * io) . tcp_connect ( addr) . unwrap ( ) ;
635
669
let mut buf = [ 0 , .. 2048 ] ;
636
670
let mut total_bytes_read = 0 ;
637
671
while total_bytes_read < MAX {
0 commit comments