Skip to content

Commit 7ce3386

Browse files
committed
auto merge of #11112 : alexcrichton/rust/issue-11087, r=brson
This should allow callers to know whether the channel was empty or disconnected without having to block. Closes #11087
2 parents f60d937 + adb895a commit 7ce3386

File tree

7 files changed

+114
-35
lines changed

7 files changed

+114
-35
lines changed

src/libextra/comm.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Higher level communication abstractions.
1616

1717
#[allow(missing_doc)];
1818

19+
use std::comm;
20+
1921
/// An extension of `pipes::stream` that allows both sending and receiving.
2022
pub struct DuplexStream<T, U> {
2123
priv chan: Chan<T>,
@@ -40,7 +42,7 @@ impl<T:Send,U:Send> DuplexStream<T, U> {
4042
pub fn recv(&self) -> U {
4143
self.port.recv()
4244
}
43-
pub fn try_recv(&self) -> Option<U> {
45+
pub fn try_recv(&self) -> comm::TryRecvResult<U> {
4446
self.port.try_recv()
4547
}
4648
pub fn recv_opt(&self) -> Option<U> {
@@ -77,11 +79,11 @@ impl<T: Send> SyncPort<T> {
7779
})
7880
}
7981

80-
pub fn try_recv(&self) -> Option<T> {
81-
self.duplex_stream.try_recv().map(|val| {
82-
self.duplex_stream.try_send(());
83-
val
84-
})
82+
pub fn try_recv(&self) -> comm::TryRecvResult<T> {
83+
match self.duplex_stream.try_recv() {
84+
comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
85+
state => state,
86+
}
8587
}
8688
}
8789

src/libextra/sync.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
use std::borrow;
22+
use std::comm;
2223
use std::unstable::sync::Exclusive;
2324
use std::sync::arc::UnsafeArc;
2425
use std::sync::atomics;
@@ -49,7 +50,7 @@ impl WaitQueue {
4950
// Signals one live task from the queue.
5051
fn signal(&self) -> bool {
5152
match self.head.try_recv() {
52-
Some(ch) => {
53+
comm::Data(ch) => {
5354
// Send a wakeup signal. If the waiter was killed, its port will
5455
// have closed. Keep trying until we get a live task.
5556
if ch.try_send_deferred(()) {
@@ -58,20 +59,20 @@ impl WaitQueue {
5859
self.signal()
5960
}
6061
}
61-
None => false
62+
_ => false
6263
}
6364
}
6465

6566
fn broadcast(&self) -> uint {
6667
let mut count = 0;
6768
loop {
6869
match self.head.try_recv() {
69-
None => break,
70-
Some(ch) => {
70+
comm::Data(ch) => {
7171
if ch.try_send_deferred(()) {
7272
count += 1;
7373
}
7474
}
75+
_ => break
7576
}
7677
}
7778
count

src/libgreen/sched.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,7 @@ fn new_sched_rng() -> XorShiftRng {
958958

959959
#[cfg(test)]
960960
mod test {
961+
use std::comm;
961962
use std::task::TaskOpts;
962963
use std::rt::Runtime;
963964
use std::rt::task::Task;
@@ -1376,7 +1377,7 @@ mod test {
13761377
// This task should not be able to starve the sender;
13771378
// The sender should get stolen to another thread.
13781379
do spawn {
1379-
while port.try_recv().is_none() { }
1380+
while port.try_recv() != comm::Data(()) { }
13801381
}
13811382

13821383
chan.send(());
@@ -1393,7 +1394,7 @@ mod test {
13931394
// This task should not be able to starve the other task.
13941395
// The sends should eventually yield.
13951396
do spawn {
1396-
while port.try_recv().is_none() {
1397+
while port.try_recv() != comm::Data(()) {
13971398
chan2.send(());
13981399
}
13991400
}

src/libstd/comm/mod.rs

+88-16
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ macro_rules! test (
251251
#[allow(unused_imports)];
252252

253253
use native;
254+
use comm::*;
254255
use prelude::*;
255256
use super::*;
256257
use super::super::*;
@@ -323,6 +324,20 @@ pub struct SharedChan<T> {
323324
priv queue: mpsc::Producer<T, Packet>,
324325
}
325326

327+
/// This enumeration is the list of the possible reasons that try_recv could not
328+
/// return data when called.
329+
#[deriving(Eq, Clone)]
330+
pub enum TryRecvResult<T> {
331+
/// This channel is currently empty, but the sender(s) have not yet
332+
/// disconnected, so data may yet become available.
333+
Empty,
334+
/// This channel's sending half has become disconnected, and there will
335+
/// never be any more data received on this channel
336+
Disconnected,
337+
/// The channel had some data and we successfully popped it
338+
Data(T),
339+
}
340+
326341
///////////////////////////////////////////////////////////////////////////////
327342
// Internal struct definitions
328343
///////////////////////////////////////////////////////////////////////////////
@@ -739,11 +754,11 @@ impl<T: Send> Port<T> {
739754
/// block on a port.
740755
///
741756
/// This function cannot fail.
742-
pub fn try_recv(&self) -> Option<T> {
757+
pub fn try_recv(&self) -> TryRecvResult<T> {
743758
self.try_recv_inc(true)
744759
}
745760

746-
fn try_recv_inc(&self, increment: bool) -> Option<T> {
761+
fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> {
747762
// This is a "best effort" situation, so if a queue is inconsistent just
748763
// don't worry about it.
749764
let this = unsafe { cast::transmute_mut(self) };
@@ -807,7 +822,35 @@ impl<T: Send> Port<T> {
807822
if increment && ret.is_some() {
808823
unsafe { (*this.queue.packet()).steals += 1; }
809824
}
810-
return ret;
825+
match ret {
826+
Some(t) => Data(t),
827+
None => {
828+
// It's possible that between the time that we saw the queue was
829+
// empty and here the other side disconnected. It's also
830+
// possible for us to see the disconnection here while there is
831+
// data in the queue. It's pretty backwards-thinking to return
832+
// Disconnected when there's actually data on the queue, so if
833+
// we see a disconnected state be sure to check again to be 100%
834+
// sure that there's no data in the queue.
835+
let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) };
836+
if cnt != DISCONNECTED { return Empty }
837+
838+
let ret = match this.queue {
839+
SPSC(ref mut queue) => queue.pop(),
840+
MPSC(ref mut queue) => match queue.pop() {
841+
mpsc::Data(t) => Some(t),
842+
mpsc::Empty => None,
843+
mpsc::Inconsistent => {
844+
fail!("inconsistent with no senders?!");
845+
}
846+
}
847+
};
848+
match ret {
849+
Some(data) => Data(data),
850+
None => Disconnected,
851+
}
852+
}
853+
}
811854
}
812855

813856
/// Attempt to wait for a value on this port, but does not fail if the
@@ -824,7 +867,11 @@ impl<T: Send> Port<T> {
824867
/// the value found on the port is returned.
825868
pub fn recv_opt(&self) -> Option<T> {
826869
// optimistic preflight check (scheduling is expensive)
827-
match self.try_recv() { None => {}, data => return data }
870+
match self.try_recv() {
871+
Empty => {},
872+
Disconnected => return None,
873+
Data(t) => return Some(t),
874+
}
828875

829876
let packet;
830877
let this;
@@ -843,12 +890,11 @@ impl<T: Send> Port<T> {
843890
});
844891
}
845892

846-
let data = self.try_recv_inc(false);
847-
if data.is_none() &&
848-
unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
849-
fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
893+
match self.try_recv_inc(false) {
894+
Data(t) => Some(t),
895+
Empty => fail!("bug: woke up too soon"),
896+
Disconnected => None,
850897
}
851-
return data;
852898
}
853899

854900
/// Returns an iterator which will block waiting for messages, but never
@@ -1005,7 +1051,10 @@ mod test {
10051051
for _ in range(0, AMT * NTHREADS) {
10061052
assert_eq!(p.recv(), 1);
10071053
}
1008-
assert_eq!(p.try_recv(), None);
1054+
match p.try_recv() {
1055+
Data(..) => fail!(),
1056+
_ => {}
1057+
}
10091058
c1.send(());
10101059
}
10111060

@@ -1129,7 +1178,7 @@ mod test {
11291178
test!(fn oneshot_single_thread_try_recv_open() {
11301179
let (port, chan) = Chan::<int>::new();
11311180
chan.send(10);
1132-
assert!(port.try_recv() == Some(10));
1181+
assert!(port.recv_opt() == Some(10));
11331182
})
11341183

11351184
test!(fn oneshot_single_thread_try_recv_closed() {
@@ -1140,21 +1189,21 @@ mod test {
11401189

11411190
test!(fn oneshot_single_thread_peek_data() {
11421191
let (port, chan) = Chan::<int>::new();
1143-
assert!(port.try_recv().is_none());
1192+
assert_eq!(port.try_recv(), Empty)
11441193
chan.send(10);
1145-
assert!(port.try_recv().is_some());
1194+
assert_eq!(port.try_recv(), Data(10));
11461195
})
11471196

11481197
test!(fn oneshot_single_thread_peek_close() {
11491198
let (port, chan) = Chan::<int>::new();
11501199
{ let _c = chan; }
1151-
assert!(port.try_recv().is_none());
1152-
assert!(port.try_recv().is_none());
1200+
assert_eq!(port.try_recv(), Disconnected);
1201+
assert_eq!(port.try_recv(), Disconnected);
11531202
})
11541203

11551204
test!(fn oneshot_single_thread_peek_open() {
11561205
let (port, _) = Chan::<int>::new();
1157-
assert!(port.try_recv().is_none());
1206+
assert_eq!(port.try_recv(), Empty);
11581207
})
11591208

11601209
test!(fn oneshot_multi_task_recv_then_send() {
@@ -1321,4 +1370,27 @@ mod test {
13211370
drop(chan);
13221371
assert_eq!(count_port.recv(), 4);
13231372
})
1373+
1374+
test!(fn try_recv_states() {
1375+
let (p, c) = Chan::<int>::new();
1376+
let (p1, c1) = Chan::<()>::new();
1377+
let (p2, c2) = Chan::<()>::new();
1378+
do spawn {
1379+
p1.recv();
1380+
c.send(1);
1381+
c2.send(());
1382+
p1.recv();
1383+
drop(c);
1384+
c2.send(());
1385+
}
1386+
1387+
assert_eq!(p.try_recv(), Empty);
1388+
c1.send(());
1389+
p2.recv();
1390+
assert_eq!(p.try_recv(), Data(1));
1391+
assert_eq!(p.try_recv(), Empty);
1392+
c1.send(());
1393+
p2.recv();
1394+
assert_eq!(p.try_recv(), Disconnected);
1395+
})
13241396
}

src/libstd/comm/select.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#[allow(dead_code)];
4646

4747
use cast;
48+
use comm;
4849
use iter::Iterator;
4950
use kinds::Send;
5051
use ops::Drop;
@@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> {
279280
pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
280281
/// Immediately attempt to receive a value on a port, this function will
281282
/// never block. Has the same semantics as `Port.try_recv`.
282-
pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
283+
pub fn try_recv(&mut self) -> comm::TryRecvResult<T> {
284+
self.port.try_recv()
285+
}
283286
}
284287

285288
#[unsafe_destructor]
@@ -409,8 +412,8 @@ mod test {
409412
a = p1.recv() => { assert_eq!(a, 1); },
410413
a = p2.recv() => { assert_eq!(a, 2); }
411414
)
412-
assert_eq!(p1.try_recv(), None);
413-
assert_eq!(p2.try_recv(), None);
415+
assert_eq!(p1.try_recv(), Empty);
416+
assert_eq!(p2.try_recv(), Empty);
414417
c3.send(());
415418
})
416419

src/libstd/io/signal.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl Listener {
144144
#[cfg(test)]
145145
mod test {
146146
use libc;
147+
use comm::Empty;
147148
use io::timer;
148149
use super::{Listener, Interrupt};
149150

@@ -194,7 +195,7 @@ mod test {
194195
s2.unregister(Interrupt);
195196
sigint();
196197
timer::sleep(10);
197-
assert!(s2.port.try_recv().is_none());
198+
assert_eq!(s2.port.try_recv(), Empty);
198199
}
199200

200201
#[cfg(windows)]

src/libstd/io/timer.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,15 @@ mod test {
123123
let port1 = timer.oneshot(10000);
124124
let port = timer.oneshot(1);
125125
port.recv();
126-
assert_eq!(port1.try_recv(), None);
126+
assert!(port1.recv_opt().is_none());
127127
}
128128

129129
#[test]
130130
fn test_io_timer_oneshot_then_sleep() {
131131
let mut timer = Timer::new().unwrap();
132132
let port = timer.oneshot(100000000000);
133133
timer.sleep(1); // this should invalidate the port
134-
135-
assert_eq!(port.try_recv(), None);
134+
assert!(port.recv_opt().is_none());
136135
}
137136

138137
#[test]

0 commit comments

Comments
 (0)