Skip to content

Commit 093a638

Browse files
committed
Another pass at executors
1 parent 6148b1f commit 093a638

File tree

12 files changed

+273
-256
lines changed

12 files changed

+273
-256
lines changed

futures-cpupool/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use std::thread;
4949

5050
use crossbeam::sync::MsQueue;
5151
use futures::{Future, oneshot, Oneshot, Complete, Poll};
52-
use futures::task::{Task, Run, Executor};
52+
use futures::task::{self, Run, Executor};
5353

5454
/// A thread pool intended to run CPU intensive work.
5555
///
@@ -162,7 +162,7 @@ impl CpuPool {
162162
fut: AssertUnwindSafe(f).catch_unwind(),
163163
tx: Some(tx),
164164
};
165-
Task::new(self.inner.clone(), sender.boxed()).unpark();
165+
task::spawn(sender.boxed()).execute(self.inner.clone());
166166
CpuFuture { inner: rx }
167167
}
168168
}

src/lib.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
//! [README]: https://github.com/alexcrichton/futures-rs#futures-rs
151151
152152
#![no_std]
153-
#![deny(missing_docs)]
153+
// #![deny(missing_docs)]
154154

155155
#[macro_use]
156156
#[cfg(feature = "use_std")]
@@ -373,19 +373,10 @@ pub trait Future {
373373
/// This function does not attempt to catch panics. If the `poll` function
374374
/// panics, panics will be propagated to the caller.
375375
#[cfg(feature = "use_std")]
376-
fn wait(mut self) -> Result<Self::Item, Self::Error>
376+
fn wait(self) -> Result<Self::Item, Self::Error>
377377
where Self: Sized
378378
{
379-
use std::thread;
380-
381-
let task = task::ThreadTask::new();
382-
loop {
383-
match task.enter(|| self.poll()) {
384-
Poll::Ok(e) => return Ok(e),
385-
Poll::Err(e) => return Err(e),
386-
Poll::NotReady => thread::park(),
387-
}
388-
}
379+
task::spawn(self).wait_future()
389380
}
390381

391382
/// Convenience function for turning this future into a trait object.

src/stream/wait.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
use std::thread;
2-
3-
use Poll;
41
use stream::Stream;
5-
use task::ThreadTask;
2+
use task;
63

74
/// A stream combinator which converts an asynchronous stream to a **blocking
85
/// iterator**.
@@ -11,29 +8,19 @@ use task::ThreadTask;
118
/// into a standard iterator. This is implemented by blocking the current thread
129
/// while items on the underlying stream aren't ready yet.
1310
pub struct Wait<S> {
14-
task: ThreadTask,
15-
stream: S,
11+
stream: task::Spawn<S>,
1612
}
1713

1814
pub fn new<S: Stream>(s: S) -> Wait<S> {
1915
Wait {
20-
task: ThreadTask::new(),
21-
stream: s,
16+
stream: task::spawn(s),
2217
}
2318
}
2419

2520
impl<S: Stream> Iterator for Wait<S> {
2621
type Item = Result<S::Item, S::Error>;
2722

2823
fn next(&mut self) -> Option<Self::Item> {
29-
let stream = &mut self.stream;
30-
loop {
31-
match self.task.enter(|| stream.poll()) {
32-
Poll::Ok(Some(e)) => return Some(Ok(e)),
33-
Poll::Ok(None) => return None,
34-
Poll::Err(e) => return Some(Err(e)),
35-
Poll::NotReady => thread::park(),
36-
}
37-
}
24+
self.stream.wait_stream()
3825
}
3926
}

0 commit comments

Comments
 (0)