Skip to content

Commit 0a6b921

Browse files
committed
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind channels, but rather their API usage. In the past, we had the distinction between oneshot, stream, and shared channels, but the most recent rewrite dropped oneshots in favor of streams and shared channels. This distinction of stream vs shared has shown that it's not quite what we'd like either, and this moves the `std::comm` module in the direction of "one channel to rule them all". There now remains only one Chan and one Port. This new channel is actually a hybrid oneshot/stream/shared channel under the hood in order to optimize for the use cases in question. Additionally, this also reduces the cognitive burden of having to choose between a Chan or a SharedChan in an API. My simple benchmarks show no reduction in efficiency over the existing channels today, and a 3x improvement in the oneshot case. I sadly don't have a pre-last-rewrite compiler to test out the old old oneshots, but I would imagine that the performance is comparable, but slightly slower (due to atomic reference counting). This commit also brings the bonus bugfix to channels that the pending queue of messages are all dropped when a Port disappears rather then when both the Port and the Chan disappear.
1 parent 47ef200 commit 0a6b921

39 files changed

+1907
-698
lines changed

src/doc/guide-tasks.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single
232232
~~~
233233
# use std::task::spawn;
234234
235-
let (port, chan) = SharedChan::new();
235+
let (port, chan) = Chan::new();
236236
237237
for init_val in range(0u, 3) {
238238
// Create a new channel handle to distribute to the child task

src/libextra/test.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts,
767767
remaining.reverse();
768768
let mut pending = 0;
769769

770-
let (p, ch) = SharedChan::new();
770+
let (p, ch) = Chan::new();
771771

772772
while pending > 0 || !remaining.is_empty() {
773773
while pending < concurrency && !remaining.is_empty() {
@@ -878,7 +878,7 @@ pub fn filter_tests(
878878

879879
pub fn run_test(force_ignore: bool,
880880
test: TestDescAndFn,
881-
monitor_ch: SharedChan<MonitorMsg>) {
881+
monitor_ch: Chan<MonitorMsg>) {
882882

883883
let TestDescAndFn {desc, testfn} = test;
884884

@@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool,
888888
}
889889

890890
fn run_test_inner(desc: TestDesc,
891-
monitor_ch: SharedChan<MonitorMsg>,
891+
monitor_ch: Chan<MonitorMsg>,
892892
testfn: proc()) {
893893
spawn(proc() {
894894
let mut task = task::task();
@@ -1260,7 +1260,7 @@ mod tests {
12601260
},
12611261
testfn: DynTestFn(proc() f()),
12621262
};
1263-
let (p, ch) = SharedChan::new();
1263+
let (p, ch) = Chan::new();
12641264
run_test(false, desc, ch);
12651265
let (_, res) = p.recv();
12661266
assert!(res != TrOk);
@@ -1277,7 +1277,7 @@ mod tests {
12771277
},
12781278
testfn: DynTestFn(proc() f()),
12791279
};
1280-
let (p, ch) = SharedChan::new();
1280+
let (p, ch) = Chan::new();
12811281
run_test(false, desc, ch);
12821282
let (_, res) = p.recv();
12831283
assert_eq!(res, TrIgnored);
@@ -1294,7 +1294,7 @@ mod tests {
12941294
},
12951295
testfn: DynTestFn(proc() f()),
12961296
};
1297-
let (p, ch) = SharedChan::new();
1297+
let (p, ch) = Chan::new();
12981298
run_test(false, desc, ch);
12991299
let (_, res) = p.recv();
13001300
assert_eq!(res, TrOk);
@@ -1311,7 +1311,7 @@ mod tests {
13111311
},
13121312
testfn: DynTestFn(proc() f()),
13131313
};
1314-
let (p, ch) = SharedChan::new();
1314+
let (p, ch) = Chan::new();
13151315
run_test(false, desc, ch);
13161316
let (_, res) = p.recv();
13171317
assert_eq!(res, TrFailed);

src/libgreen/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ pub struct SchedPool {
315315
#[deriving(Clone)]
316316
struct TaskState {
317317
cnt: UnsafeArc<AtomicUint>,
318-
done: SharedChan<()>,
318+
done: Chan<()>,
319319
}
320320

321321
impl SchedPool {
@@ -469,7 +469,7 @@ impl SchedPool {
469469

470470
impl TaskState {
471471
fn new() -> (Port<()>, TaskState) {
472-
let (p, c) = SharedChan::new();
472+
let (p, c) = Chan::new();
473473
(p, TaskState {
474474
cnt: UnsafeArc::new(AtomicUint::new(0)),
475475
done: c,

src/libnative/io/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//! that you would find on the respective platform.
2323
2424
use std::c_str::CString;
25-
use std::comm::SharedChan;
2625
use std::io;
2726
use std::io::IoError;
2827
use std::io::net::ip::SocketAddr;
@@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory {
289288
})
290289
}
291290
}
292-
fn signal(&mut self, _signal: Signum, _channel: SharedChan<Signum>)
291+
fn signal(&mut self, _signal: Signum, _channel: Chan<Signum>)
293292
-> IoResult<~RtioSignal> {
294293
Err(unimpl())
295294
}

src/libnative/io/timer_helper.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use task;
3333
// only torn down after everything else has exited. This means that these
3434
// variables are read-only during use (after initialization) and both of which
3535
// are safe to use concurrently.
36-
static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
36+
static mut HELPER_CHAN: *mut Chan<Req> = 0 as *mut Chan<Req>;
3737
static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
3838

3939
pub fn boot(helper: fn(imp::signal, Port<Req>)) {
@@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port<Req>)) {
4343
unsafe {
4444
LOCK.lock();
4545
if !INITIALIZED {
46-
let (msgp, msgc) = SharedChan::new();
46+
let (msgp, msgc) = Chan::new();
47+
// promote this to a shared channel
48+
drop(msgc.clone());
4749
HELPER_CHAN = cast::transmute(~msgc);
4850
let (receive, send) = imp::new();
4951
HELPER_SIGNAL = send;
@@ -84,8 +86,8 @@ fn shutdown() {
8486
// Clean up after ther helper thread
8587
unsafe {
8688
imp::close(HELPER_SIGNAL);
87-
let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
88-
HELPER_CHAN = 0 as *mut SharedChan<Req>;
89+
let _chan: ~Chan<Req> = cast::transmute(HELPER_CHAN);
90+
HELPER_CHAN = 0 as *mut Chan<Req>;
8991
HELPER_SIGNAL = 0 as imp::signal;
9092
}
9193
}

src/librustuv/signal.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use std::libc::c_int;
1212
use std::io::signal::Signum;
13-
use std::comm::SharedChan;
1413
use std::rt::rtio::RtioSignal;
1514

1615
use homing::{HomingIO, HomeHandle};
@@ -22,13 +21,13 @@ pub struct SignalWatcher {
2221
handle: *uvll::uv_signal_t,
2322
home: HomeHandle,
2423

25-
channel: SharedChan<Signum>,
24+
channel: Chan<Signum>,
2625
signal: Signum,
2726
}
2827

2928
impl SignalWatcher {
3029
pub fn new(io: &mut UvIoFactory, signum: Signum,
31-
channel: SharedChan<Signum>) -> Result<~SignalWatcher, UvError> {
30+
channel: Chan<Signum>) -> Result<~SignalWatcher, UvError> {
3231
let s = ~SignalWatcher {
3332
handle: UvHandle::alloc(None::<SignalWatcher>, uvll::UV_SIGNAL),
3433
home: io.make_handle(),
@@ -81,7 +80,7 @@ mod test {
8180
#[test]
8281
fn closing_channel_during_drop_doesnt_kill_everything() {
8382
// see issue #10375, relates to timers as well.
84-
let (port, chan) = SharedChan::new();
83+
let (port, chan) = Chan::new();
8584
let _signal = SignalWatcher::new(local_loop(), signal::Interrupt,
8685
chan);
8786

src/librustuv/uvio.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use std::c_str::CString;
1212
use std::cast;
13-
use std::comm::SharedChan;
1413
use std::io::IoError;
1514
use std::io::net::ip::SocketAddr;
1615
use std::io::process::ProcessConfig;
@@ -304,7 +303,7 @@ impl IoFactory for UvIoFactory {
304303
}
305304
}
306305

307-
fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>)
306+
fn signal(&mut self, signum: Signum, channel: Chan<Signum>)
308307
-> Result<~rtio::RtioSignal, IoError> {
309308
match SignalWatcher::new(self, signum, channel) {
310309
Ok(s) => Ok(s as ~rtio::RtioSignal),

0 commit comments

Comments
 (0)