Skip to content

Scheduler + I/O #7265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 128 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
128 commits
Select commit Hold shift + click to select a range
29d8300
core::rt: Move uv idle tests to idle mod
brson May 20, 2013
8072690
core::rt: Add bindings for async uv handles
brson May 21, 2013
6d8d73c
Add AtomicUint newtype
Aatch May 21, 2013
8f77a6f
core: Add AtomicInt and cleanup
brson May 22, 2013
a0cd55a
core::rt: Add RemoteCallback trait and uv implementation
brson May 21, 2013
41c2168
core::rt: Add SchedHandle type
brson May 22, 2013
8b7e392
core::rt: Scheduler takes a WorkQueue
brson May 23, 2013
7f107c4
core::rt: Remove UvEventLoop::new_scheduler function
brson May 23, 2013
3f8095e
core::rt: Add a very basic multi-threaded scheduling test
brson May 23, 2013
dec9db1
core::rt: Add SleeperList
brson May 29, 2013
ed8c359
core::rt: Add SleeperList to Scheduler
brson May 29, 2013
5043ea2
core::rt: Add run_in_mt_newsched_task test function
brson May 29, 2013
a373dad
core::rt: Outline the full multithreaded scheduling algo. Implement s…
brson May 29, 2013
f343e61
core::rt: Fix an infinite recursion bug
brson May 30, 2013
134bb0f
core::rt: Change the signature of context switching methods to avoid …
brson May 30, 2013
f4ed554
Merge remote-tracking branch 'brson/io' into incoming
brson May 30, 2013
ca2eebd
core::rt: Add some notes about optimizations
brson May 30, 2013
8eb358b
core::rt: Begin recording scheduler metrics
brson May 30, 2013
053b38e
core::rt: Fix two multithreading bugs and add a threadring test
brson May 30, 2013
ea633b4
core::rt: deny(unused_imports, unused_mut, unused_variable)
brson May 30, 2013
e2bedb1
core: Make atomic methods public
brson Jun 1, 2013
2e6d51f
std::rt: Use AtomicUint instead of intrinsics in comm
brson Jun 2, 2013
f7e242a
std::rt: Destroy the task start closure while in task context
brson Jun 4, 2013
1507df8
std::rt: Remove in incorrect assert
brson Jun 4, 2013
422f663
core::rt: Implement SharedChan
brson Jun 1, 2013
51d257f
core::rt: Add SharedPort
brson Jun 1, 2013
ece38b3
core::rt: Add `MegaPipe`, an unbounded, multiple producer/consumer, l…
brson Jun 1, 2013
80849e7
std: Fix stage0 build
brson Jun 2, 2013
f9a5005
rt: Add rust_get_num_cpus
brson Jun 6, 2013
8afec77
std::rt: Configure test threads with RUST_TEST_THREADS. Default is nc…
brson Jun 6, 2013
d6ccc6b
std::rt: Fix stream test to be parallel
brson Jun 6, 2013
d4de99a
std::rt: Fix a race in the UvRemoteCallback dtor
brson Jun 7, 2013
d83d38c
std::rt: Reduce task stack size to 1MB
brson Jun 7, 2013
d64d26c
debugged a compiler ICE when merging local::borrow changes into the m…
toddaaro Jun 10, 2013
84d2695
std::rt: Work around a dynamic borrowck bug
brson Jun 11, 2013
8428081
A basic implementation of pinning tasks to schedulers. No IO interact…
toddaaro Jun 12, 2013
e7213aa
std::rt: Remove old files
brson Jun 12, 2013
eb11274
Removing redundant libuv bindings
Jun 12, 2013
4224fc7
added functionality to tell schedulers to refuse to run tasks that ar…
toddaaro Jun 12, 2013
39a575f
Added libuv UDP function bindings.
Jun 12, 2013
5393e43
Corrected libuv UDP bindings.
Jun 13, 2013
abc3a8a
std::rt: Add JoinLatch
brson Jun 2, 2013
fd148cd
std::rt: Change the Task constructors to reflect a tree
brson Jun 14, 2013
90fbe38
std::rt: Tasks must have an unwinder. Simpler
brson Jun 14, 2013
74e7255
Added a utility function to extract the udp handle from udp send requ…
Jun 14, 2013
03fe59a
added bindings to extract udp handle from udp send requests
Jun 14, 2013
a7f92c9
Added a UdpWatcher and UdpSendRequest with associated callbacks
Jun 14, 2013
d1ec8b5
redesigned the pinning to pin deal with things on dequeue, not on enq…
toddaaro Jun 14, 2013
b08c446
Merge remote-tracking branch 'toddaaro/io' into io
brson Jun 16, 2013
505ef7e
std::rt: Tasks contain a JoinLatch
brson Jun 14, 2013
3208fc3
Merge remote-tracking branch 'brson/io-wip' into io
brson Jun 16, 2013
319cf6e
Merge remote-tracking branch 'brson/io'
brson Jun 16, 2013
9687437
added wrappers about uv_ip{4,6}_{port,name}
Jun 17, 2013
b51d188
Added a RtioUdpStream trait
Jun 17, 2013
7e022c5
added a function to convert C's ipv4 data structure into the Rust ipv…
Jun 17, 2013
4744375
added Eq and TotalEq instances for IpAddr
Jun 17, 2013
e42f28c
stated to implement UdpStream
Jun 17, 2013
33ae193
Started to implemented UdpStream
Jun 17, 2013
35f3fa6
Merge remote-tracking branch 'upstream/io' into io
Jun 17, 2013
3281f5b
std::rt: Add util mod and num_cpus function
brson Jun 18, 2013
9ef4c41
std::rt: Check exchange count on exit
brson Jun 18, 2013
021e81f
std::rt: move abort function to util module
brson Jun 18, 2013
b5fbec9
std: Rename `abort!` to `rtabort!` to match other macros
brson Jun 18, 2013
5b2dc52
std::rt: Turn on multithreaded scheduling
brson Jun 18, 2013
29ad8e1
std::rt: Improve the rtabort! macro
brson Jun 18, 2013
915aaa7
std::rt: Set the process exit code
brson Jun 19, 2013
5722c95
std::rt: Correct the numbers of default cores
brson Jun 19, 2013
e1555f9
std::rt: Document and cleanup the run function
brson Jun 19, 2013
d777ba0
Wrote the Eq instance of IpAddr in a slightly different way.
Jun 19, 2013
753b497
Modified a match in resume_task_from_queue that was returning an int …
toddaaro Jun 19, 2013
b548c78
Merge pull request #3 from toddaaro/io
brson Jun 19, 2013
083c692
Changed visibility from being on the impl to being on methods per lan…
Jun 19, 2013
5086c08
std::rt: Update GC metadata in init
brson Jun 19, 2013
ac49b74
socket based UDP io
Jun 20, 2013
36c0e04
derived instances of Eq and TotalEq for IpAddr rather than implement …
Jun 20, 2013
391bb0b
std: Make newsched failures log correctly
brson Jun 20, 2013
55dda46
Merge remote-tracking branch 'upstream/io' into io
Jun 20, 2013
bbf5469
Merge remote-tracking branch 'brson/io-wip' into io
brson Jun 20, 2013
4d39253
std::rt: Whitespace
brson Jun 20, 2013
357f087
Merge remote-tracking branch 'brson/io' into io-upstream
brson Jun 20, 2013
7a9a6e4
std: Port SharedChan to newsched
brson Jun 21, 2013
1b7c996
std::rt: Support os::args
brson Jun 21, 2013
95eb019
std: Make console log off/on controls work with newsched
brson Jun 21, 2013
aa9210d
std: Rewrite vec_reserve_shared_actual in Rust
brson Jun 22, 2013
a09972d
std: Move dynamic borrowck code from unstable::lang to rt::borrowck
brson Jun 22, 2013
5e7c5d6
std: Make box annihilator work with newsched
brson Jun 22, 2013
b530ca1
std: Make unlinking and task notification work with newsched
brson Jun 23, 2013
d071f51
std::rt: deny(unused_unsafe)
brson Jun 23, 2013
e65d0cb
extra: Make test runner compatible with newsched
brson Jun 24, 2013
794923c
UDP networking with tests
Jun 25, 2013
4870dce
Merge remote-tracking branch 'upstream/io' into io
Jun 25, 2013
1af2016
removed unncessary unsafe block that was stopping compliation.
Jun 25, 2013
f202713
satisfy the formatting check
Jun 25, 2013
2c5cfe1
removed obsolete FIXMEs. formatting changes.
Jun 25, 2013
d0c812f
IPv6 struct
Jun 25, 2013
c5b19f0
changed outdated match on IpAddr
Jun 25, 2013
f604686
converted UvUdpSocket into a newtype struct
Jun 26, 2013
34b1135
Converted UdpSocket into a newtype struct and (dis)connecting uses mo…
Jun 26, 2013
d0dc697
removed unecessary method
Jun 26, 2013
87ecfb7
converted TCP interface to newtype structs
Jun 26, 2013
ce97bd4
cleaned up uv/net
Jun 26, 2013
42f3f06
changed NOTE to TODO
Jun 26, 2013
ddbccec
std::rt: Some cleanup
brson Jun 27, 2013
5cfad4b
Refactored the runtime to view coroutines as a component of tasks, in…
toddaaro Jun 26, 2013
0e07c8d
rt: Add global_args_lock functions to rustrt.def.in
brson Jul 1, 2013
062bfd3
merging task/coroutine refactoring back into upstream
toddaaro Jul 1, 2013
27818ea
removed unnecessary import that slipped in during merge
toddaaro Jul 1, 2013
6fd15ff
std::rt: Ignore homed task tests
brson Jul 2, 2013
0607178
A missing ! made it so that the testcase schedule_home_states was thr…
toddaaro Jul 2, 2013
75a913f
Merge remote-tracking branch 'toddaaro/niots'
brson Jul 2, 2013
f8a4d09
std: Use the same task failure message as C++ rt
brson Jun 25, 2013
e6c5779
IPv6 support for UDP and TCP.
Jul 2, 2013
6a1a781
Merge remote-tracking branch 'upstream/io' into io
Jul 2, 2013
b60cf0c
converted TODOs into XXXs
Jul 3, 2013
1098d69
Merge remote-tracking branch 'mozilla/master'
brson Jul 3, 2013
cf23292
Merge remote-tracking branch 'upstream/io' into io
Jul 8, 2013
6b2abca
renamed finalize to drop in Drop impl for UvUdpSocket
Jul 8, 2013
5e0be46
changed .each() to .iter().advance()
Jul 8, 2013
4282539
std::rt: Add a hack to allocate different test port ranges to differe…
brson Jul 8, 2013
b227583
Merge remote-tracking branch 'anasazi/io'
brson Jul 8, 2013
fae3336
Merge remote-tracking branch 'mozilla/master'
brson Jul 8, 2013
7826651
Tidy
brson Jul 9, 2013
29c9443
std: Add a yield implementation for newsched
brson Jul 9, 2013
ec6d4a1
std::rt: size_t, not u64
brson Jul 9, 2013
07e52eb
std: Make os::set_exit_status work with newsched
brson Jul 9, 2013
2c13157
rt: Make the old rand builtins work with newsched
brson Jul 9, 2013
6fb92f8
std::rt: Do local tests in a bare thread to not interfere with the sc…
brson Jul 9, 2013
413d51e
std::rt: Ignore 0-byte udp reads
brson Jul 9, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions src/libextra/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ use std::u64;
use std::uint;
use std::vec;

pub mod rustrt {
use std::libc::size_t;

#[abi = "cdecl"]
pub extern {
pub unsafe fn rust_sched_threads() -> size_t;
}
}

// The name of a test. By convention this follows the rules for rust
// paths; i.e. it should be a series of identifiers separated by double
Expand Down Expand Up @@ -493,11 +485,10 @@ static SCHED_OVERCOMMIT : uint = 1;
static SCHED_OVERCOMMIT : uint = 4u;

fn get_concurrency() -> uint {
unsafe {
let threads = rustrt::rust_sched_threads() as uint;
if threads == 1 { 1 }
else { threads * SCHED_OVERCOMMIT }
}
use std::rt;
let threads = rt::util::default_sched_threads();
if threads == 1 { 1 }
else { threads * SCHED_OVERCOMMIT }
}

#[allow(non_implicitly_copyable_typarams)]
Expand Down
74 changes: 48 additions & 26 deletions src/libstd/at_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,6 @@ use vec::{ImmutableVector, OwnedVector};
/// Code for dealing with @-vectors. This is pretty incomplete, and
/// contains a bunch of duplication from the code for ~-vectors.

pub mod rustrt {
use libc;
use vec;
#[cfg(stage0)]
use intrinsic::{TyDesc};
#[cfg(not(stage0))]
use unstable::intrinsics::{TyDesc};

#[abi = "cdecl"]
#[link_name = "rustrt"]
pub extern {
pub unsafe fn vec_reserve_shared_actual(t: *TyDesc,
v: **vec::raw::VecRepr,
n: libc::size_t);
}
}

/// Returns the number of elements the vector can hold without reallocating
#[inline]
pub fn capacity<T>(v: @[T]) -> uint {
Expand Down Expand Up @@ -192,18 +175,17 @@ pub mod traits {
pub mod traits {}

pub mod raw {
use at_vec::{capacity, rustrt};
use at_vec::capacity;
use cast;
use cast::{transmute, transmute_copy};
use libc;
use ptr;
use sys;
use uint;
use unstable::intrinsics::{move_val_init};
use unstable::intrinsics;
use unstable::intrinsics::{move_val_init, TyDesc};
use vec;
#[cfg(stage0)]
use intrinsic::{get_tydesc};
#[cfg(not(stage0))]
use unstable::intrinsics::{get_tydesc};
use vec::UnboxedVecRepr;

pub type VecRepr = vec::raw::VecRepr;
pub type SliceRepr = vec::raw::SliceRepr;
Expand Down Expand Up @@ -264,9 +246,49 @@ pub mod raw {
pub unsafe fn reserve<T>(v: &mut @[T], n: uint) {
// Only make the (slow) call into the runtime if we have to
if capacity(*v) < n {
let ptr: **VecRepr = transmute(v);
rustrt::vec_reserve_shared_actual(get_tydesc::<T>(),
ptr, n as libc::size_t);
let ptr: *mut *mut VecRepr = transmute(v);
let ty = intrinsics::get_tydesc::<T>();
// XXX transmute shouldn't be necessary
let ty = cast::transmute(ty);
return reserve_raw(ty, ptr, n);
}
}

// Implementation detail. Shouldn't be public
#[allow(missing_doc)]
pub fn reserve_raw(ty: *TyDesc, ptr: *mut *mut VecRepr, n: uint) {

unsafe {
let size_in_bytes = n * (*ty).size;
if size_in_bytes > (**ptr).unboxed.alloc {
let total_size = size_in_bytes + sys::size_of::<UnboxedVecRepr>();
// XXX: UnboxedVecRepr has an extra u8 at the end
let total_size = total_size - sys::size_of::<u8>();
(*ptr) = local_realloc(*ptr as *(), total_size) as *mut VecRepr;
(**ptr).unboxed.alloc = size_in_bytes;
}
}

fn local_realloc(ptr: *(), size: uint) -> *() {
use rt;
use rt::OldTaskContext;
use rt::local::Local;
use rt::task::Task;

if rt::context() == OldTaskContext {
unsafe {
return rust_local_realloc(ptr, size as libc::size_t);
}

extern {
#[fast_ffi]
fn rust_local_realloc(ptr: *(), size: libc::size_t) -> *();
}
} else {
do Local::borrow::<Task, *()> |task| {
task.heap.realloc(ptr as *libc::c_void, size) as *()
}
}
}
}

Expand Down
117 changes: 15 additions & 102 deletions src/libstd/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,105 +10,13 @@

#[doc(hidden)];

use libc::{c_char, intptr_t, uintptr_t};
use libc::c_void;
use ptr::{mut_null};
use repr::BoxRepr;
use cast::transmute;
use unstable::intrinsics::TyDesc;
#[cfg(not(test))] use unstable::lang::clear_task_borrow_list;

/**
* Runtime structures
*
* NB: These must match the representation in the C++ runtime.
*/

type TaskID = uintptr_t;

struct StackSegment { priv opaque: () }
struct Scheduler { priv opaque: () }
struct SchedulerLoop { priv opaque: () }
struct Kernel { priv opaque: () }
struct Env { priv opaque: () }
struct AllocHeader { priv opaque: () }
struct MemoryRegion { priv opaque: () }

#[cfg(target_arch="x86")]
struct Registers {
data: [u32, ..16]
}

#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Registers {
data: [u32, ..32]
}

#[cfg(target_arch="x86")]
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Context {
regs: Registers,
next: *Context,
pad: [u32, ..3]
}

#[cfg(target_arch="x86_64")]
struct Registers {
data: [u64, ..22]
}

#[cfg(target_arch="x86_64")]
struct Context {
regs: Registers,
next: *Context,
pad: uintptr_t
}

struct BoxedRegion {
env: *Env,
backing_region: *MemoryRegion,
live_allocs: *BoxRepr
}

#[cfg(target_arch="x86")]
#[cfg(target_arch="arm")]
#[cfg(target_arch="mips")]
struct Task {
// Public fields
refcount: intptr_t, // 0
id: TaskID, // 4
pad: [u32, ..2], // 8
ctx: Context, // 16
stack_segment: *StackSegment, // 96
runtime_sp: uintptr_t, // 100
scheduler: *Scheduler, // 104
scheduler_loop: *SchedulerLoop, // 108

// Fields known only to the runtime
kernel: *Kernel, // 112
name: *c_char, // 116
list_index: i32, // 120
boxed_region: BoxedRegion // 128
}

#[cfg(target_arch="x86_64")]
struct Task {
// Public fields
refcount: intptr_t,
id: TaskID,
ctx: Context,
stack_segment: *StackSegment,
runtime_sp: uintptr_t,
scheduler: *Scheduler,
scheduler_loop: *SchedulerLoop,

// Fields known only to the runtime
kernel: *Kernel,
name: *c_char,
list_index: i32,
boxed_region: BoxedRegion
}
type DropGlue<'self> = &'self fn(**TyDesc, *c_void);

/*
* Box annihilation
Expand All @@ -127,9 +35,9 @@ unsafe fn each_live_alloc(read_next_before: bool,
//! Walks the internal list of allocations

use managed;
use rt::local_heap;

let task: *Task = transmute(rustrt::rust_get_task());
let box = (*task).boxed_region.live_allocs;
let box = local_heap::live_allocs();
let mut box: *mut BoxRepr = transmute(copy box);
while box != mut_null() {
let next_before = transmute(copy (*box).header.next);
Expand All @@ -151,7 +59,13 @@ unsafe fn each_live_alloc(read_next_before: bool,

#[cfg(unix)]
fn debug_mem() -> bool {
::rt::env::get().debug_mem
use rt;
use rt::OldTaskContext;
// XXX: Need to port the environment struct to newsched
match rt::context() {
OldTaskContext => ::rt::env::get().debug_mem,
_ => false
}
}

#[cfg(windows)]
Expand All @@ -173,13 +87,12 @@ unsafe fn call_drop_glue(tydesc: *TyDesc, data: *i8) {
}

/// Destroys all managed memory (i.e. @ boxes) held by the current task.
#[cfg(not(test))]
#[lang="annihilate"]
pub unsafe fn annihilate() {
use unstable::lang::local_free;
use rt::local_heap::local_free;
use io::WriterUtil;
use io;
use libc;
use rt::borrowck;
use sys;
use managed;

Expand All @@ -191,7 +104,7 @@ pub unsafe fn annihilate() {

// Quick hack: we need to free this list upon task exit, and this
// is a convenient place to do it.
clear_task_borrow_list();
borrowck::clear_task_borrow_list();

// Pass 1: Make all boxes immortal.
//
Expand All @@ -213,7 +126,7 @@ pub unsafe fn annihilate() {
// callback, as the original value may have been freed.
for each_live_alloc(false) |box, uniq| {
if !uniq {
let tydesc = (*box).header.type_desc;
let tydesc: *TyDesc = transmute(copy (*box).header.type_desc);
let data = transmute(&(*box).data);
call_drop_glue(tydesc, data);
}
Expand Down
40 changes: 25 additions & 15 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,48 +220,58 @@ impl<T: Send> Peekable<T> for PortSet<T> {

/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
ch: Exclusive<pipesy::Chan<T>>
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
}

impl<T: Send> SharedChan<T> {
/// Converts a `chan` into a `shared_chan`.
pub fn new(c: Chan<T>) -> SharedChan<T> {
let Chan { inner } = c;
let c = match inner {
Left(c) => c,
Right(_) => fail!("SharedChan not implemented")
Left(c) => Left(exclusive(c)),
Right(c) => Right(rtcomm::SharedChan::new(c))
};
SharedChan { ch: exclusive(c) }
SharedChan { inner: c }
}
}

impl<T: Send> GenericChan<T> for SharedChan<T> {
fn send(&self, x: T) {
unsafe {
let mut xx = Some(x);
do self.ch.with_imm |chan| {
let x = replace(&mut xx, None);
chan.send(x.unwrap())
match self.inner {
Left(ref chan) => {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.send(x.unwrap())
}
}
}
Right(ref chan) => chan.send(x)
}
}
}

impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, x: T) -> bool {
unsafe {
let mut xx = Some(x);
do self.ch.with_imm |chan| {
let x = replace(&mut xx, None);
chan.try_send(x.unwrap())
match self.inner {
Left(ref chan) => {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.try_send(x.unwrap())
}
}
}
Right(ref chan) => chan.try_send(x)
}
}
}

impl<T: Send> ::clone::Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan { ch: self.ch.clone() }
SharedChan { inner: self.inner.clone() }
}
}

Expand Down
Loading