Skip to content

Enhance timers to create ports #10083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 131 additions & 4 deletions src/libstd/rt/io/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,37 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

/*!

Synchronous Timers

This module exposes the functionality to create timers, block the current task,
and create ports which will receive notifications after a period of time.

# Example

```rust

use std::rt::io::Timer;

let mut timer = Timer::new().unwrap();
timer.sleep(10); // block the task for awhile

let timeout = timer.oneshot(10);
// do some work
timeout.recv(); // wait for the timeout to expire

let periodic = timer.periodic(10);
loop {
periodic.recv();
// this loop is only executed once every 10ms
}

```

*/

use comm::{Port, PortOne};
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::io_error;
Expand All @@ -25,9 +56,9 @@ pub fn sleep(msecs: u64) {
}

impl Timer {

/// Creates a new timer which can be used to put the current task to sleep
/// for a number of milliseconds.
/// for a number of milliseconds, or to possibly create channels which will
/// get notified after an amount of time has passed.
pub fn new() -> Option<Timer> {
do with_local_io |io| {
match io.timer_init() {
Expand All @@ -42,20 +73,116 @@ impl Timer {
}
}

/// Blocks the current task for `msecs` milliseconds.
///
/// Note that this function will cause any other ports for this timer to be
/// invalidated (the other end will be closed).
pub fn sleep(&mut self, msecs: u64) {
self.obj.sleep(msecs);
}

/// Creates a oneshot port which will have a notification sent when `msecs`
/// milliseconds has elapsed. This does *not* block the current task, but
/// instead returns immediately.
///
/// Note that this invalidates any previous port which has been created by
/// this timer, and that the returned port will be invalidated once the
/// timer is destroyed (when it falls out of scope).
pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
self.obj.oneshot(msecs)
}

/// Creates a port which will have a continuous stream of notifications
/// being sent every `msecs` milliseconds. This does *not* block the
/// current task, but instead returns immediately. The first notification
/// will not be received immediately, but rather after `msec` milliseconds
/// have passed.
///
/// Note that this invalidates any previous port which has been created by
/// this timer, and that the returned port will be invalidated once the
/// timer is destroyed (when it falls out of scope).
pub fn periodic(&mut self, msecs: u64) -> Port<()> {
self.obj.period(msecs)
}
}

#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use cell::Cell;
use task;

#[test]
fn test_io_timer_sleep_simple() {
do run_in_mt_newsched_task {
let timer = Timer::new();
do timer.map |mut t| { t.sleep(1) };
let mut timer = Timer::new().unwrap();
timer.sleep(1);
}
}

#[test]
fn test_io_timer_sleep_oneshot() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.oneshot(1).recv();
}
}

#[test]
fn test_io_timer_sleep_oneshot_forget() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.oneshot(100000000000);
}
}

#[test]
fn oneshot_twice() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port1 = timer.oneshot(100000000000);
let port = timer.oneshot(1);
port.recv();
let port1 = Cell::new(port1);
let ret = do task::try {
port1.take().recv();
};
assert!(ret.is_err());
}
}

#[test]
fn test_io_timer_oneshot_then_sleep() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port = timer.oneshot(100000000000);
timer.sleep(1); // this should invalidate the port

let port = Cell::new(port);
let ret = do task::try {
port.take().recv();
};
assert!(ret.is_err());
}
}

#[test]
fn test_io_timer_sleep_periodic() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
let port = timer.periodic(1);
port.recv();
port.recv();
port.recv();
}
}

#[test]
fn test_io_timer_sleep_periodic_forget() {
do run_in_mt_newsched_task {
let mut timer = Timer::new().unwrap();
timer.periodic(100000000000);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/libstd/rt/rtio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use libc;
use option::*;
use result::*;
use comm::SharedChan;
use comm::{SharedChan, PortOne, Port};
use libc::c_int;
use c_str::CString;

Expand Down Expand Up @@ -162,6 +162,8 @@ pub trait RtioUdpSocket : RtioSocket {

pub trait RtioTimer {
fn sleep(&mut self, msecs: u64);
fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
fn period(&mut self, msecs: u64) -> Port<()>;
}

pub trait RtioFileStream {
Expand Down
37 changes: 36 additions & 1 deletion src/libstd/rt/uv/uvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use cast::transmute;
use cast;
use cell::Cell;
use clone::Clone;
use comm::{SendDeferred, SharedChan};
use comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
use libc::{c_int, c_uint, c_void, pid_t};
use ops::Drop;
use option::*;
Expand Down Expand Up @@ -1468,6 +1468,41 @@ impl RtioTimer for UvTimer {
self_.watcher.stop();
}
}

fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
use comm::oneshot;

let (port, chan) = oneshot();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
assert!(!chan.is_empty());
chan.take().send_deferred(());
}
}

return port;
}

fn period(&mut self, msecs: u64) -> Port<()> {
use comm::stream;

let (port, chan) = stream();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, msecs) |_, status| {
assert!(status.is_none());
do chan.with_ref |chan| {
chan.send_deferred(());
}
}
}

return port;
}
}

pub struct UvFileStream {
Expand Down