Skip to content

green/native: Be more resilient to spawn failures #15944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl Scheduler {
let message = stask.sched.get_mut_ref().message_queue.pop();
rtassert!(match message { msgq::Empty => true, _ => false });

stask.task.get_mut_ref().destroyed = true;
stask.task.take().unwrap().drop();
}

// This does not return a scheduler, as the scheduler is placed
Expand Down
21 changes: 18 additions & 3 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,30 @@ impl Runtime for GreenTask {
f: proc():Send) {
self.put_task(cur_task);

// First, set up a bomb which when it goes off will restore the local
// task unless its disarmed. This will allow us to gracefully fail from
// inside of `configure` which allocates a new task.
struct Bomb { inner: Option<Box<GreenTask>> }
impl Drop for Bomb {
fn drop(&mut self) {
let _ = self.inner.take().map(|task| task.put());
}
}
let mut bomb = Bomb { inner: Some(self) };

// Spawns a task into the current scheduler. We allocate the new task's
// stack from the scheduler's stack pool, and then configure it
// accordingly to `opts`. Afterwards we bootstrap it immediately by
// switching to it.
//
// Upon returning, our task is back in TLS and we're good to return.
let mut sched = self.sched.take_unwrap();
let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
sched.run_task(self, sibling)
let sibling = {
let sched = bomb.inner.get_mut_ref().sched.get_mut_ref();
GreenTask::configure(&mut sched.stack_pool, opts, f)
};
let mut me = bomb.inner.take().unwrap();
let sched = me.sched.take().unwrap();
sched.run_task(me, sibling)
}

// Local I/O is provided by the scheduler's event loop
Expand Down
4 changes: 2 additions & 2 deletions src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
// Note that this increment must happen *before* the spawn in order to
// guarantee that if this task exits it will always end up waiting for the
// spawned task to exit.
bookkeeping::increment();
let token = bookkeeping::increment();

// Spawning a new OS thread guarantees that __morestack will never get
// triggered, but we must manually set up the actual stack bounds once this
Expand All @@ -93,7 +93,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
let mut task = task;
task.put_runtime(ops);
drop(task.run(|| { f.take_unwrap()() }).destroy());
bookkeeping::decrement();
drop(token);
})
}

Expand Down
12 changes: 11 additions & 1 deletion src/librustrt/bookkeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,24 @@
//! decrement() manually.

use core::atomics;
use core::ops::Drop;

use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};

static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;

pub fn increment() {
pub struct Token { _private: () }

impl Drop for Token {
fn drop(&mut self) { decrement() }
}

/// Increment the number of live tasks, returning a token which will decrement
/// the count when dropped.
pub fn increment() -> Token {
let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) };
Token { _private: () }
}

pub fn decrement() {
Expand Down
4 changes: 2 additions & 2 deletions src/librustrt/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ mod test {
}).join();
}

fn cleanup_task(mut t: Box<Task>) {
t.destroyed = true;
fn cleanup_task(t: Box<Task>) {
t.drop();
}

}
37 changes: 30 additions & 7 deletions src/librustrt/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ pub struct Task {
pub storage: LocalStorage,
pub unwinder: Unwinder,
pub death: Death,
pub destroyed: bool,
pub name: Option<SendStr>,

state: TaskState,
imp: Option<Box<Runtime + Send>>,
}

// Once a task has entered the `Armed` state it must be destroyed via `drop`,
// and no other method. This state is used to track this transition.
#[deriving(PartialEq)]
enum TaskState {
New,
Armed,
Destroyed,
}

pub struct TaskOpts {
/// Invoke this procedure with the result of the task when it finishes.
pub on_exit: Option<proc(Result): Send>,
Expand Down Expand Up @@ -159,7 +168,7 @@ impl Task {
storage: LocalStorage(None),
unwinder: Unwinder::new(),
death: Death::new(),
destroyed: false,
state: New,
name: None,
imp: None,
}
Expand Down Expand Up @@ -203,7 +212,7 @@ impl Task {
/// }).destroy();
/// # }
/// ```
pub fn run(self: Box<Task>, f: ||) -> Box<Task> {
pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
assert!(!self.is_destroyed(), "cannot re-use a destroyed task");

// First, make sure that no one else is in TLS. This does not allow
Expand All @@ -212,6 +221,7 @@ impl Task {
if Local::exists(None::<Task>) {
fail!("cannot run a task recursively inside another");
}
self.state = Armed;
Local::put(self);

// There are two primary reasons that general try/catch is unsafe. The
Expand Down Expand Up @@ -333,12 +343,12 @@ impl Task {
// Now that we're done, we remove the task from TLS and flag it for
// destruction.
let mut task: Box<Task> = Local::take();
task.destroyed = true;
task.state = Destroyed;
return task;
}

/// Queries whether this can be destroyed or not.
pub fn is_destroyed(&self) -> bool { self.destroyed }
pub fn is_destroyed(&self) -> bool { self.state == Destroyed }

/// Inserts a runtime object into this task, transferring ownership to the
/// task. It is illegal to replace a previous runtime object in this task
Expand Down Expand Up @@ -453,12 +463,20 @@ impl Task {
pub fn can_block(&self) -> bool {
self.imp.get_ref().can_block()
}

/// Consume this task, flagging it as a candidate for destruction.
///
/// This function is required to be invoked to destroy a task. A task
/// destroyed through a normal drop will abort.
pub fn drop(mut self) {
self.state = Destroyed;
}
}

impl Drop for Task {
fn drop(&mut self) {
rtdebug!("called drop for a task: {}", self as *mut Task as uint);
rtassert!(self.destroyed);
rtassert!(self.state != Armed);
}
}

Expand Down Expand Up @@ -634,12 +652,17 @@ mod test {
begin_unwind("cause", file!(), line!())
}

#[test]
fn drop_new_task_ok() {
drop(Task::new());
}

// Task blocking tests

#[test]
fn block_and_wake() {
let task = box Task::new();
let mut task = BlockedTask::block(task).wake().unwrap();
task.destroyed = true;
task.drop();
}
}
47 changes: 47 additions & 0 deletions src/test/run-pass/spawn-stack-too-big.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// ignore-macos apparently gargantuan mmap requests are ok?

#![feature(phase)]

#[phase(plugin)]
extern crate green;
extern crate native;

use std::task::TaskBuilder;
use native::NativeTaskBuilder;

green_start!(main)

fn main() {
test();

let (tx, rx) = channel();
TaskBuilder::new().native().spawn(proc() {
tx.send(test());
});
rx.recv();
}

#[cfg(not(target_word_size = "64"))]
fn test() {}

#[cfg(target_word_size = "64")]
fn test() {
let (tx, rx) = channel();
spawn(proc() {
TaskBuilder::new().stack_size(1024 * 1024 * 1024 * 64).spawn(proc() {
});
tx.send(());
});

assert!(rx.recv_opt().is_err());
}