Skip to content

Remove the UnsafeArc type #14301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 22, 2014
52 changes: 19 additions & 33 deletions src/libgreen/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//! This implementation is also used as the fallback implementation of an event
//! loop if no other one is provided (and M:N scheduling is desired).

use alloc::arc::Arc;
use std::sync::atomics;
use std::mem;
use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
use std::rt::rtio::{PausableIdleCallback, Callback};
Expand All @@ -27,10 +29,11 @@ pub fn event_loop() -> Box<EventLoop:Send> {

struct BasicLoop {
work: Vec<proc():Send>, // pending work
idle: Option<*mut BasicPausable>, // only one is allowed
remotes: Vec<(uint, Box<Callback:Send>)>,
next_remote: uint,
messages: Exclusive<Vec<Message>>,
idle: Option<Box<Callback:Send>>,
idle_active: Option<Arc<atomics::AtomicBool>>,
}

enum Message { RunRemote(uint), RemoveRemote(uint) }
Expand All @@ -40,6 +43,7 @@ impl BasicLoop {
BasicLoop {
work: vec![],
idle: None,
idle_active: None,
next_remote: 0,
remotes: vec![],
messages: Exclusive::new(vec![]),
Expand Down Expand Up @@ -92,20 +96,18 @@ impl BasicLoop {

/// Run the idle callback if one is registered
fn idle(&mut self) {
unsafe {
match self.idle {
Some(idle) => {
if (*idle).active {
(*idle).work.call();
}
match self.idle {
Some(ref mut idle) => {
if self.idle_active.get_ref().load(atomics::SeqCst) {
idle.call();
}
None => {}
}
None => {}
}
}

fn has_idle(&self) -> bool {
unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst)
}
}

Expand Down Expand Up @@ -141,13 +143,11 @@ impl EventLoop for BasicLoop {
// FIXME: Seems like a really weird requirement to have an event loop provide.
fn pausable_idle_callback(&mut self, cb: Box<Callback:Send>)
-> Box<PausableIdleCallback:Send> {
let callback = box BasicPausable::new(self, cb);
rtassert!(self.idle.is_none());
unsafe {
let cb_ptr: &*mut BasicPausable = mem::transmute(&callback);
self.idle = Some(*cb_ptr);
}
callback as Box<PausableIdleCallback:Send>
self.idle = Some(cb);
let a = Arc::new(atomics::AtomicBool::new(true));
self.idle_active = Some(a.clone());
box BasicPausable { active: a } as Box<PausableIdleCallback:Send>
}

fn remote_callback(&mut self, f: Box<Callback:Send>)
Expand Down Expand Up @@ -196,35 +196,21 @@ impl Drop for BasicRemote {
}

struct BasicPausable {
eloop: *mut BasicLoop,
work: Box<Callback:Send>,
active: bool,
}

impl BasicPausable {
fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
BasicPausable {
active: false,
work: cb,
eloop: eloop,
}
}
active: Arc<atomics::AtomicBool>,
}

impl PausableIdleCallback for BasicPausable {
fn pause(&mut self) {
self.active = false;
self.active.store(false, atomics::SeqCst);
}
fn resume(&mut self) {
self.active = true;
self.active.store(true, atomics::SeqCst);
}
}

impl Drop for BasicPausable {
fn drop(&mut self) {
unsafe {
(*self.eloop).idle = None;
}
self.active.store(false, atomics::SeqCst);
}
}

Expand Down
15 changes: 7 additions & 8 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@
#[cfg(test)] extern crate rustuv;
extern crate rand;
extern crate libc;
extern crate alloc;

use alloc::arc::Arc;
use std::mem::replace;
use std::os;
use std::rt::rtio;
Expand All @@ -223,7 +225,6 @@ use std::rt;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
use std::sync::deque;
use std::task::TaskOpts;
use std::sync::arc::UnsafeArc;

use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
use sleeper_list::SleeperList;
Expand Down Expand Up @@ -375,7 +376,7 @@ pub struct SchedPool {
/// sending on a channel once the entire pool has been drained of all tasks.
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
cnt: Arc<AtomicUint>,
done: Sender<()>,
}

Expand Down Expand Up @@ -434,7 +435,6 @@ impl SchedPool {
pool.sleepers.clone(),
pool.task_state.clone());
pool.handles.push(sched.make_handle());
let sched = sched;
pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
}

Expand Down Expand Up @@ -496,7 +496,6 @@ impl SchedPool {
self.task_state.clone());
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
let sched = sched;
self.threads.push(Thread::start(proc() { sched.bootstrap() }));

return ret;
Expand Down Expand Up @@ -537,21 +536,21 @@ impl TaskState {
fn new() -> (Receiver<()>, TaskState) {
let (tx, rx) = channel();
(rx, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
cnt: Arc::new(AtomicUint::new(0)),
done: tx,
})
}

fn increment(&mut self) {
unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
self.cnt.fetch_add(1, SeqCst);
}

fn active(&self) -> bool {
unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
self.cnt.load(SeqCst) != 0
}

fn decrement(&mut self) {
let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
let prev = self.cnt.fetch_sub(1, SeqCst);
if prev == 1 {
self.done.send(());
}
Expand Down
28 changes: 16 additions & 12 deletions src/libgreen/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use alloc::arc::Arc;
use mpsc = std::sync::mpsc_queue;
use std::sync::arc::UnsafeArc;
use std::kinds::marker;

pub enum PopResult<T> {
Inconsistent,
Expand All @@ -18,29 +19,32 @@ pub enum PopResult<T> {
}

pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
(Consumer { inner: a }, Producer { inner: b })
let a = Arc::new(mpsc::Queue::new());
(Consumer { inner: a.clone(), noshare: marker::NoShare },
Producer { inner: a, noshare: marker::NoShare })
}

pub struct Producer<T> {
inner: UnsafeArc<mpsc::Queue<T>>,
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}

pub struct Consumer<T> {
inner: UnsafeArc<mpsc::Queue<T>>,
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}

impl<T: Send> Consumer<T> {
pub fn pop(&mut self) -> PopResult<T> {
match unsafe { (*self.inner.get()).pop() } {
pub fn pop(&self) -> PopResult<T> {
match self.inner.pop() {
mpsc::Inconsistent => Inconsistent,
mpsc::Empty => Empty,
mpsc::Data(t) => Data(t),
}
}

pub fn casual_pop(&mut self) -> Option<T> {
match unsafe { (*self.inner.get()).pop() } {
pub fn casual_pop(&self) -> Option<T> {
match self.inner.pop() {
mpsc::Inconsistent => None,
mpsc::Empty => None,
mpsc::Data(t) => Some(t),
Expand All @@ -49,13 +53,13 @@ impl<T: Send> Consumer<T> {
}

impl<T: Send> Producer<T> {
pub fn push(&mut self, t: T) {
unsafe { (*self.inner.get()).push(t); }
pub fn push(&self, t: T) {
self.inner.push(t);
}
}

impl<T: Send> Clone for Producer<T> {
fn clone(&self) -> Producer<T> {
Producer { inner: self.inner.clone() }
Producer { inner: self.inner.clone(), noshare: marker::NoShare }
}
}
2 changes: 1 addition & 1 deletion src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ mod test {

Thread::start(proc() {
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let pool = BufferPool::new();
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
let queues = vec![normal_stealer, special_stealer];
Expand Down
12 changes: 4 additions & 8 deletions src/libnative/io/file_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

//! Blocking posix-based file I/O

use alloc::arc::Arc;
use libc::{c_int, c_void};
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;

use io::{IoResult, retry, keep_going};

Expand All @@ -29,7 +29,7 @@ struct Inner {
}

pub struct FileDesc {
inner: UnsafeArc<Inner>
inner: Arc<Inner>
}

impl FileDesc {
Expand All @@ -42,7 +42,7 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { inner: UnsafeArc::new(Inner {
FileDesc { inner: Arc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
Expand Down Expand Up @@ -79,11 +79,7 @@ impl FileDesc {
}
}

pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> fd_t { self.inner.fd }
}

impl io::Reader for FileDesc {
Expand Down
16 changes: 6 additions & 10 deletions src/libnative/io/file_win32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@

//! Blocking win32-based file I/O

use alloc::arc::Arc;
use libc::{c_int, c_void};
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use libc::{c_int, c_void};
use libc;
use std::mem;
use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
use std::ptr;
use std::rt::rtio;
use std::str;
use std::sync::arc::UnsafeArc;
use std::vec;

use io::IoResult;
Expand All @@ -33,7 +33,7 @@ struct Inner {
}

pub struct FileDesc {
inner: UnsafeArc<Inner>
inner: Arc<Inner>
}

impl FileDesc {
Expand All @@ -46,7 +46,7 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { inner: UnsafeArc::new(Inner {
FileDesc { inner: Arc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
Expand Down Expand Up @@ -85,11 +85,7 @@ impl FileDesc {
Ok(())
}

pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> fd_t { self.inner.fd }

pub fn handle(&self) -> libc::HANDLE {
unsafe { libc::get_osfhandle(self.fd()) as libc::HANDLE }
Expand Down
Loading