Skip to content

Commit 47ef200

Browse files
committed
Shuffle around ownership in concurrent queues
Beforehand, using a concurrent queue always mandated that the "shared state" be stored internally to the queues in order to provide a safe interface. This isn't quite as flexible as one would want in some circumstances, so instead this commit moves the queues to not containing the shared state. The queues no longer have a "default useful safe" interface, but rather a "default safe" interface (minus the useful part). The queues have to be shared manually through an Arc or some other means. This allows them to be a little more flexible at the cost of a usability hindrance. I plan on using this new flexibility to upgrade a channel to a shared channel seamlessly.
1 parent 0ac6e5a commit 47ef200

File tree

8 files changed

+344
-373
lines changed

8 files changed

+344
-373
lines changed

src/libgreen/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ use task::GreenTask;
193193

194194
mod macros;
195195
mod simple;
196+
mod message_queue;
196197

197198
pub mod basic;
198199
pub mod context;

src/libgreen/message_queue.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
use mpsc = std::sync::mpsc_queue;
12+
use std::sync::arc::UnsafeArc;
13+
14+
pub enum PopResult<T> {
15+
Inconsistent,
16+
Empty,
17+
Data(T),
18+
}
19+
20+
pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
21+
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
22+
(Consumer { inner: a }, Producer { inner: b })
23+
}
24+
25+
pub struct Producer<T> {
26+
priv inner: UnsafeArc<mpsc::Queue<T>>,
27+
}
28+
29+
pub struct Consumer<T> {
30+
priv inner: UnsafeArc<mpsc::Queue<T>>,
31+
}
32+
33+
impl<T: Send> Consumer<T> {
34+
pub fn pop(&mut self) -> PopResult<T> {
35+
match unsafe { (*self.inner.get()).pop() } {
36+
mpsc::Inconsistent => Inconsistent,
37+
mpsc::Empty => Empty,
38+
mpsc::Data(t) => Data(t),
39+
}
40+
}
41+
42+
pub fn casual_pop(&mut self) -> Option<T> {
43+
match unsafe { (*self.inner.get()).pop() } {
44+
mpsc::Inconsistent => None,
45+
mpsc::Empty => None,
46+
mpsc::Data(t) => Some(t),
47+
}
48+
}
49+
}
50+
51+
impl<T: Send> Producer<T> {
52+
pub fn push(&mut self, t: T) {
53+
unsafe { (*self.inner.get()).push(t); }
54+
}
55+
}
56+
57+
impl<T: Send> Clone for Producer<T> {
58+
fn clone(&self) -> Producer<T> {
59+
Producer { inner: self.inner.clone() }
60+
}
61+
}

src/libgreen/sched.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ use std::rt::task::Task;
1717
use std::sync::deque;
1818
use std::unstable::mutex::Mutex;
1919
use std::unstable::raw;
20-
use mpsc = std::sync::mpsc_queue;
2120

2221
use TaskState;
2322
use context::Context;
2423
use coroutine::Coroutine;
2524
use sleeper_list::SleeperList;
2625
use stack::StackPool;
2726
use task::{TypeSched, GreenTask, HomeSched, AnySched};
27+
use msgq = message_queue;
2828

2929
/// A scheduler is responsible for coordinating the execution of Tasks
3030
/// on a single thread. The scheduler runs inside a slightly modified
@@ -47,9 +47,9 @@ pub struct Scheduler {
4747
/// The queue of incoming messages from other schedulers.
4848
/// These are enqueued by SchedHandles after which a remote callback
4949
/// is triggered to handle the message.
50-
message_queue: mpsc::Consumer<SchedMessage, ()>,
50+
message_queue: msgq::Consumer<SchedMessage>,
5151
/// Producer used to clone sched handles from
52-
message_producer: mpsc::Producer<SchedMessage, ()>,
52+
message_producer: msgq::Producer<SchedMessage>,
5353
/// A shared list of sleeping schedulers. We'll use this to wake
5454
/// up schedulers when pushing work onto the work queue.
5555
sleeper_list: SleeperList,
@@ -143,7 +143,7 @@ impl Scheduler {
143143
state: TaskState)
144144
-> Scheduler {
145145

146-
let (consumer, producer) = mpsc::queue(());
146+
let (consumer, producer) = msgq::queue();
147147
let mut sched = Scheduler {
148148
pool_id: pool_id,
149149
sleeper_list: sleeper_list,
@@ -215,7 +215,7 @@ impl Scheduler {
215215

216216
// Should not have any messages
217217
let message = stask.sched.get_mut_ref().message_queue.pop();
218-
rtassert!(match message { mpsc::Empty => true, _ => false });
218+
rtassert!(match message { msgq::Empty => true, _ => false });
219219

220220
stask.task.get_mut_ref().destroyed = true;
221221
}
@@ -340,8 +340,8 @@ impl Scheduler {
340340
//
341341
// I have chosen to take route #2.
342342
match self.message_queue.pop() {
343-
mpsc::Data(t) => Some(t),
344-
mpsc::Empty | mpsc::Inconsistent => None
343+
msgq::Data(t) => Some(t),
344+
msgq::Empty | msgq::Inconsistent => None
345345
}
346346
};
347347

@@ -849,7 +849,7 @@ pub enum SchedMessage {
849849

850850
pub struct SchedHandle {
851851
priv remote: ~RemoteCallback,
852-
priv queue: mpsc::Producer<SchedMessage, ()>,
852+
priv queue: msgq::Producer<SchedMessage>,
853853
sched_id: uint
854854
}
855855

src/librustuv/queue.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::cast;
2424
use std::libc::{c_void, c_int};
2525
use std::rt::task::BlockedTask;
2626
use std::unstable::sync::LittleLock;
27+
use std::sync::arc::UnsafeArc;
2728
use mpsc = std::sync::mpsc_queue;
2829

2930
use async::AsyncWatcher;
@@ -39,46 +40,46 @@ enum Message {
3940
struct State {
4041
handle: *uvll::uv_async_t,
4142
lock: LittleLock, // see comments in async_cb for why this is needed
43+
queue: mpsc::Queue<Message>,
4244
}
4345

4446
/// This structure is intended to be stored next to the event loop, and it is
4547
/// used to create new `Queue` structures.
4648
pub struct QueuePool {
47-
priv producer: mpsc::Producer<Message, State>,
48-
priv consumer: mpsc::Consumer<Message, State>,
49+
priv queue: UnsafeArc<State>,
4950
priv refcnt: uint,
5051
}
5152

5253
/// This type is used to send messages back to the original event loop.
5354
pub struct Queue {
54-
priv queue: mpsc::Producer<Message, State>,
55+
priv queue: UnsafeArc<State>,
5556
}
5657

5758
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
5859
assert_eq!(status, 0);
59-
let state: &mut QueuePool = unsafe {
60+
let pool: &mut QueuePool = unsafe {
6061
cast::transmute(uvll::get_data_for_uv_handle(handle))
6162
};
62-
let packet = unsafe { state.consumer.packet() };
63+
let state: &mut State = unsafe { cast::transmute(pool.queue.get()) };
6364

6465
// Remember that there is no guarantee about how many times an async
6566
// callback is called with relation to the number of sends, so process the
6667
// entire queue in a loop.
6768
loop {
68-
match state.consumer.pop() {
69+
match state.queue.pop() {
6970
mpsc::Data(Task(task)) => {
7071
let _ = task.wake().map(|t| t.reawaken());
7172
}
7273
mpsc::Data(Increment) => unsafe {
73-
if state.refcnt == 0 {
74-
uvll::uv_ref((*packet).handle);
74+
if pool.refcnt == 0 {
75+
uvll::uv_ref(state.handle);
7576
}
76-
state.refcnt += 1;
77+
pool.refcnt += 1;
7778
},
7879
mpsc::Data(Decrement) => unsafe {
79-
state.refcnt -= 1;
80-
if state.refcnt == 0 {
81-
uvll::uv_unref((*packet).handle);
80+
pool.refcnt -= 1;
81+
if pool.refcnt == 0 {
82+
uvll::uv_unref(state.handle);
8283
}
8384
},
8485
mpsc::Empty | mpsc::Inconsistent => break
@@ -99,24 +100,24 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
99100
// If we acquire the mutex here, then we are guaranteed that there are no
100101
// longer any senders which are holding on to their handles, so we can
101102
// safely allow the event loop to exit.
102-
if state.refcnt == 0 {
103+
if pool.refcnt == 0 {
103104
unsafe {
104-
let _l = (*packet).lock.lock();
105+
let _l = state.lock.lock();
105106
}
106107
}
107108
}
108109

109110
impl QueuePool {
110111
pub fn new(loop_: &mut Loop) -> ~QueuePool {
111112
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
112-
let (c, p) = mpsc::queue(State {
113+
let state = UnsafeArc::new(State {
113114
handle: handle,
114115
lock: LittleLock::new(),
116+
queue: mpsc::Queue::new(),
115117
});
116118
let q = ~QueuePool {
117-
producer: p,
118-
consumer: c,
119119
refcnt: 0,
120+
queue: state,
120121
};
121122

122123
unsafe {
@@ -132,23 +133,23 @@ impl QueuePool {
132133
pub fn queue(&mut self) -> Queue {
133134
unsafe {
134135
if self.refcnt == 0 {
135-
uvll::uv_ref((*self.producer.packet()).handle);
136+
uvll::uv_ref((*self.queue.get()).handle);
136137
}
137138
self.refcnt += 1;
138139
}
139-
Queue { queue: self.producer.clone() }
140+
Queue { queue: self.queue.clone() }
140141
}
141142

142143
pub fn handle(&self) -> *uvll::uv_async_t {
143-
unsafe { (*self.producer.packet()).handle }
144+
unsafe { (*self.queue.get()).handle }
144145
}
145146
}
146147

147148
impl Queue {
148149
pub fn push(&mut self, task: BlockedTask) {
149-
self.queue.push(Task(task));
150150
unsafe {
151-
uvll::uv_async_send((*self.queue.packet()).handle);
151+
(*self.queue.get()).queue.push(Task(task));
152+
uvll::uv_async_send((*self.queue.get()).handle);
152153
}
153154
}
154155
}
@@ -161,7 +162,7 @@ impl Clone for Queue {
161162
// and if the queue is dropped later on it'll see the increment for the
162163
// decrement anyway.
163164
unsafe {
164-
cast::transmute_mut(self).queue.push(Increment);
165+
(*self.queue.get()).queue.push(Increment);
165166
}
166167
Queue { queue: self.queue.clone() }
167168
}
@@ -172,9 +173,9 @@ impl Drop for Queue {
172173
// See the comments in the async_cb function for why there is a lock
173174
// that is acquired only on a drop.
174175
unsafe {
175-
let state = self.queue.packet();
176+
let state = self.queue.get();
176177
let _l = (*state).lock.lock();
177-
self.queue.push(Decrement);
178+
(*state).queue.push(Decrement);
178179
uvll::uv_async_send((*state).handle);
179180
}
180181
}

0 commit comments

Comments
 (0)