Skip to content

Assorted task cleanup and enhancement #8195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
use rt::select::{Select, SelectPort};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
Expand Down Expand Up @@ -216,15 +216,15 @@ impl<T> Select for PortOne<T> {
}
STATE_ONE => {
// Re-record that we are the only owner of the packet.
// Release barrier needed in case the task gets reawoken
// on a different core (this is analogous to writing a
// payload; a barrier in enqueueing the task protects it).
// No barrier needed, even if the task gets reawoken
// on a different core -- this is analogous to writing a
// payload; a barrier in enqueueing the task protects it.
// NB(#8132). This *must* occur before the enqueue below.
// FIXME(#6842, #8130) This is usually only needed for the
// assertion in recv_ready, except in the case of select().
// This won't actually ever have cacheline contention, but
// maybe should be optimized out with a cfg(test) anyway?
(*self.packet()).state.store(STATE_ONE, Release);
(*self.packet()).state.store(STATE_ONE, Relaxed);

rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<T> SelectPort<T> for PortOne<T> {
unsafe {
// See corresponding store() above in block_on for rationale.
// FIXME(#8130) This can happen only in test builds.
assert!((*packet).state.load(Acquire) == STATE_ONE);
assert!((*packet).state.load(Relaxed) == STATE_ONE);

let payload = (*packet).payload.take();

Expand Down Expand Up @@ -375,9 +375,7 @@ impl<T> Drop for PortOne<T> {
// receiver was killed awake. The task can't still be
// blocked (we are it), but we need to free the handle.
let recvr = BlockedTask::cast_from_uint(task_as_state);
// FIXME(#7554)(bblum): Make this cfg(test) dependent.
// in a later commit.
assert!(recvr.wake().is_none());
recvr.assert_already_awake();
}
}
}
Expand Down
88 changes: 76 additions & 12 deletions src/libstd/rt/kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,73 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Task death: asynchronous killing, linked failure, exit code propagation.
/*!

Task death: asynchronous killing, linked failure, exit code propagation.

This file implements two orthogonal building-blocks for communicating failure
between tasks. One is 'linked failure' or 'task killing', that is, a failing
task causing other tasks to fail promptly (even those that are blocked on
pipes or I/O). The other is 'exit code propagation', which affects the result
observed by the parent of a task::try task that itself spawns child tasks
(such as any #[test] function). In both cases the data structures live in
KillHandle.

I. Task killing.

The model for killing involves two atomic flags, the "kill flag" and the
"unkillable flag". Operations on the kill flag include:

- In the taskgroup code (task/spawn.rs), tasks store a clone of their
KillHandle in their shared taskgroup. Another task in the group that fails
will use that handle to call kill().
- When a task blocks, it turns its ~Task into a BlockedTask by storing a
the transmuted ~Task pointer inside the KillHandle's kill flag. A task
trying to block and a task trying to kill it can simultaneously access the
kill flag, after which the task will get scheduled and fail (no matter who
wins the race). Likewise, a task trying to wake a blocked task normally and
a task trying to kill it can simultaneously access the flag; only one will
get the task to reschedule it.

Operations on the unkillable flag include:

- When a task becomes unkillable, it swaps on the flag to forbid any killer
from waking it up while it's blocked inside the unkillable section. If a
kill was already pending, the task fails instead of becoming unkillable.
- When a task is done being unkillable, it restores the flag to the normal
running state. If a kill was received-but-blocked during the unkillable
section, the task fails at this later point.
- When a task tries to kill another task, before swapping on the kill flag, it
first swaps on the unkillable flag, to see if it's "allowed" to wake up the
task. If it isn't, the killed task will receive the signal when it becomes
killable again. (Of course, a task trying to wake the task normally (e.g.
sending on a channel) does not access the unkillable flag at all.)

Why do we not need acquire/release barriers on any of the kill flag swaps?
This is because barriers establish orderings between accesses on different
memory locations, but each kill-related operation is only a swap on a single
location, so atomicity is all that matters. The exception is kill(), which
does a swap on both flags in sequence. kill() needs no barriers because it
does not matter if its two accesses are seen reordered on another CPU: if a
killer does perform both writes, it means it saw a KILL_RUNNING in the
unkillable flag, which means an unkillable task will see KILL_KILLED and fail
immediately (rendering the subsequent write to the kill flag unnecessary).

II. Exit code propagation.

FIXME(#7544): Decide on the ultimate model for this and document it.

*/

use cast;
use cell::Cell;
use either::{Either, Left, Right};
use option::{Option, Some, None};
use prelude::*;
use rt::task::Task;
use task::spawn::Taskgroup;
use to_bytes::IterBytes;
use unstable::atomics::{AtomicUint, Acquire, SeqCst};
use unstable::atomics::{AtomicUint, Relaxed};
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
use util;

Expand Down Expand Up @@ -95,7 +152,7 @@ impl Drop for KillFlag {
// Letting a KillFlag with a task inside get dropped would leak the task.
// We could free it here, but the task should get awoken by hand somehow.
fn drop(&self) {
match self.load(Acquire) {
match self.load(Relaxed) {
KILL_RUNNING | KILL_KILLED => { },
_ => rtabort!("can't drop kill flag with a blocked task inside!"),
}
Expand Down Expand Up @@ -124,7 +181,7 @@ impl BlockedTask {
Unkillable(task) => Some(task),
Killable(flag_arc) => {
let flag = unsafe { &mut **flag_arc.get() };
match flag.swap(KILL_RUNNING, SeqCst) {
match flag.swap(KILL_RUNNING, Relaxed) {
KILL_RUNNING => None, // woken from select(), perhaps
KILL_KILLED => None, // a killer stole it already
task_ptr =>
Expand Down Expand Up @@ -159,7 +216,7 @@ impl BlockedTask {
let flag = &mut **flag_arc.get();
let task_ptr = cast::transmute(task);
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
match flag.compare_and_swap(KILL_RUNNING, task_ptr, SeqCst) {
match flag.compare_and_swap(KILL_RUNNING, task_ptr, Relaxed) {
KILL_RUNNING => Right(Killable(flag_arc)),
KILL_KILLED => Left(revive_task_ptr(task_ptr, Some(flag_arc))),
x => rtabort!("can't block task! kill flag = %?", x),
Expand Down Expand Up @@ -257,7 +314,7 @@ impl KillHandle {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, Relaxed) {
KILL_RUNNING => { }, // normal case
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
_ => rtabort!("inhibit_kill: task already unkillable"),
Expand All @@ -270,7 +327,7 @@ impl KillHandle {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, Relaxed) {
KILL_UNKILLABLE => { }, // normal case
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
_ => rtabort!("allow_kill: task already killable"),
Expand All @@ -281,10 +338,10 @@ impl KillHandle {
// if it was blocked and needs punted awake. To be called by other tasks.
pub fn kill(&mut self) -> Option<~Task> {
let inner = unsafe { &mut *self.get() };
if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
if inner.unkillable.swap(KILL_KILLED, Relaxed) == KILL_RUNNING {
// Got in. Allowed to try to punt the task awake.
let flag = unsafe { &mut *inner.killed.get() };
match flag.swap(KILL_KILLED, SeqCst) {
match flag.swap(KILL_KILLED, Relaxed) {
// Task either not blocked or already taken care of.
KILL_RUNNING | KILL_KILLED => None,
// Got ownership of the blocked task.
Expand All @@ -306,8 +363,11 @@ impl KillHandle {
// is unkillable with a kill signal pending.
let inner = unsafe { &*self.get() };
let flag = unsafe { &*inner.killed.get() };
// FIXME(#6598): can use relaxed ordering (i think)
flag.load(Acquire) == KILL_KILLED
// A barrier-related concern here is that a task that gets killed
// awake needs to see the killer's write of KILLED to this flag. This
// is analogous to receiving a pipe payload; the appropriate barrier
// should happen when enqueueing the task.
flag.load(Relaxed) == KILL_KILLED
}

pub fn notify_immediate_failure(&mut self) {
Expand Down Expand Up @@ -415,7 +475,7 @@ impl Death {
}

/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, mut success: bool) {
pub fn collect_failure(&mut self, mut success: bool, group: Option<Taskgroup>) {
// This may run after the task has already failed, so even though the
// task appears to need to be killed, the scheduler should not fail us
// when we block to unwrap.
Expand All @@ -425,6 +485,10 @@ impl Death {
rtassert!(self.unkillable == 0);
self.unkillable = 1;

// FIXME(#7544): See corresponding fixme at the callsite in task.rs.
// NB(#8192): Doesn't work with "let _ = ..."
{ use util; util::ignore(group); }

// Step 1. Decide if we need to collect child failures synchronously.
do self.on_exit.take_map |on_exit| {
if success {
Expand Down
9 changes: 7 additions & 2 deletions src/libstd/rt/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ impl Task {
}

self.unwinder.try(f);
{ let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding);
// FIXME(#7544): We pass the taskgroup into death so that it can be
// dropped while the unkillable counter is set. This should not be
// necessary except for an extraneous clone() in task/spawn.rs that
// causes a killhandle to get dropped, which mustn't receive a kill
// signal since we're outside of the unwinder's try() scope.
// { let _ = self.taskgroup.take(); }
self.death.collect_failure(!self.unwinder.unwinding, self.taskgroup.take());
self.destroy();
}

Expand Down
10 changes: 4 additions & 6 deletions src/libstd/unstable/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use ptr;
use option::*;
use either::{Either, Left, Right};
use task;
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,SeqCst};
use unstable::atomics::{AtomicOption,AtomicUint,Acquire,Release,Relaxed,SeqCst};
use unstable::finally::Finally;
use ops::Drop;
use clone::Clone;
Expand Down Expand Up @@ -95,8 +95,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn get(&self) -> *mut T {
unsafe {
let mut data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
// FIXME(#6598) Change Acquire to Relaxed.
assert!(data.count.load(Acquire) > 0);
assert!(data.count.load(Relaxed) > 0);
let r: *mut T = data.data.get_mut_ref();
cast::forget(data);
return r;
Expand All @@ -107,7 +106,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
pub fn get_immut(&self) -> *T {
unsafe {
let data: ~AtomicRcBoxData<T> = cast::transmute(self.data);
assert!(data.count.load(Acquire) > 0); // no barrier is really needed
assert!(data.count.load(Relaxed) > 0);
let r: *T = data.data.get_ref();
cast::forget(data);
return r;
Expand All @@ -130,8 +129,7 @@ impl<T: Send> UnsafeAtomicRcBox<T> {
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
// FIXME(#6598) Change Acquire to Relaxed.
if data.unwrapper.fill(~(c1,p2), Acquire).is_none() {
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
Expand Down