Skip to content

Commit cbbf015

Browse files
feat: initial implementation of std::process
1 parent 61f9483 commit cbbf015

File tree

9 files changed

+1665
-14
lines changed

9 files changed

+1665
-14
lines changed

Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ std = [
4646
"once_cell",
4747
"pin-utils",
4848
"slab",
49+
"crossbeam-queue",
50+
"async-signals",
51+
"libc",
52+
"lazy_static",
53+
"async-macros",
4954
]
5055
alloc = [
5156
"futures-core/alloc",
@@ -72,6 +77,11 @@ once_cell = { version = "1.2.0", optional = true }
7277
pin-project-lite = { version = "0.1.2", optional = true }
7378
pin-utils = { version = "0.1.0-alpha.4", optional = true }
7479
slab = { version = "0.4.2", optional = true }
80+
crossbeam-queue = { version = "0.2.1", optional = true }
81+
async-signals = { version = "0.1.0", optional = true }
82+
lazy_static = { version = "1.4.0", optional = true }
83+
async-macros = { version = "2.0.0", optional = true }
84+
libc = { version = "0.2.67", optional = true }
7585

7686
[dev-dependencies]
7787
femme = "1.3.0"

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ cfg_default! {
270270
pub mod fs;
271271
pub mod path;
272272
pub mod net;
273+
pub mod process;
273274
}
274275

275276
cfg_unstable! {

src/net/driver/mod.rs

+86-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ struct Readers {
2929
/// (cf. `Watcher::poll_read_ready`)
3030
ready: bool,
3131
/// The `Waker`s blocked on reading.
32-
wakers: Vec<Waker>
32+
wakers: Vec<Waker>,
3333
}
3434

3535
/// The set of `Waker`s interested in write readiness.
@@ -39,7 +39,7 @@ struct Writers {
3939
/// (cf. `Watcher::poll_write_ready`)
4040
ready: bool,
4141
/// The `Waker`s blocked on writing.
42-
wakers: Vec<Waker>
42+
wakers: Vec<Waker>,
4343
}
4444

4545
/// The state of a networking driver.
@@ -88,8 +88,14 @@ impl Reactor {
8888
// Allocate an entry and insert it into the slab.
8989
let entry = Arc::new(Entry {
9090
token,
91-
readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
92-
writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
91+
readers: Mutex::new(Readers {
92+
ready: false,
93+
wakers: Vec::new(),
94+
}),
95+
writers: Mutex::new(Writers {
96+
ready: false,
97+
wakers: Vec::new(),
98+
}),
9399
});
94100
vacant.insert(entry.clone());
95101

@@ -249,6 +255,43 @@ impl<T: Evented> Watcher<T> {
249255
Poll::Pending
250256
}
251257

258+
/// Polls the inner I/O source for a non-blocking read operation.
259+
///
260+
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
261+
/// will be registered for wakeup when the I/O source becomes readable.
262+
pub fn poll_read_with_mut<'a, F, R>(
263+
&'a mut self,
264+
cx: &mut Context<'_>,
265+
mut f: F,
266+
) -> Poll<io::Result<R>>
267+
where
268+
F: FnMut(&mut T) -> io::Result<R>,
269+
{
270+
// If the operation isn't blocked, return its result.
271+
match f(self.source.as_mut().unwrap()) {
272+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
273+
res => return Poll::Ready(res),
274+
}
275+
276+
// Lock the waker list.
277+
let mut readers = self.entry.readers.lock().unwrap();
278+
279+
// Try running the operation again.
280+
match f(self.source.as_mut().unwrap()) {
281+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
282+
res => return Poll::Ready(res),
283+
}
284+
285+
// Register the task if it isn't registered already.
286+
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
287+
readers.wakers.push(cx.waker().clone());
288+
}
289+
290+
readers.ready = false;
291+
292+
Poll::Pending
293+
}
294+
252295
/// Polls the inner I/O source for a non-blocking write operation.
253296
///
254297
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
@@ -286,6 +329,43 @@ impl<T: Evented> Watcher<T> {
286329
Poll::Pending
287330
}
288331

332+
/// Polls the inner I/O source for a non-blocking write operation.
333+
///
334+
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
335+
/// will be registered for wakeup when the I/O source becomes writable.
336+
pub fn poll_write_with_mut<'a, F, R>(
337+
&'a mut self,
338+
cx: &mut Context<'_>,
339+
mut f: F,
340+
) -> Poll<io::Result<R>>
341+
where
342+
F: FnMut(&mut T) -> io::Result<R>,
343+
{
344+
// If the operation isn't blocked, return its result.
345+
match f(self.source.as_mut().unwrap()) {
346+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
347+
res => return Poll::Ready(res),
348+
}
349+
350+
// Lock the waker list.
351+
let mut writers = self.entry.writers.lock().unwrap();
352+
353+
// Try running the operation again.
354+
match f(self.source.as_mut().unwrap()) {
355+
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
356+
res => return Poll::Ready(res),
357+
}
358+
359+
// Register the task if it isn't registered already.
360+
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
361+
writers.wakers.push(cx.waker().clone());
362+
}
363+
364+
writers.ready = false;
365+
366+
Poll::Pending
367+
}
368+
289369
/// Polls the inner I/O source until a non-blocking read can be performed.
290370
///
291371
/// If non-blocking reads are currently not possible, the `Waker`
@@ -296,7 +376,7 @@ impl<T: Evented> Watcher<T> {
296376
// Lock the waker list.
297377
let mut readers = self.entry.readers.lock().unwrap();
298378
if readers.ready {
299-
return Poll::Ready(())
379+
return Poll::Ready(());
300380
}
301381
// Register the task if it isn't registered already.
302382
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
@@ -314,7 +394,7 @@ impl<T: Evented> Watcher<T> {
314394
// Lock the waker list.
315395
let mut writers = self.entry.writers.lock().unwrap();
316396
if writers.ready {
317-
return Poll::Ready(())
397+
return Poll::Ready(());
318398
}
319399
// Register the task if it isn't registered already.
320400
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {

src/process/kill.rs

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use crate::io;
2+
use crate::prelude::*;
3+
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
/// A drop guard which ensures the child process is killed on drop to maintain
8+
/// the contract of dropping a Future leads to "cancellation".
9+
#[derive(Debug)]
10+
pub(crate) struct ChildDropGuard<T: Kill> {
11+
inner: T,
12+
kill_on_drop: bool,
13+
}
14+
15+
impl<T: Kill> ChildDropGuard<T> {
16+
pub(crate) fn new(inner: T) -> Self {
17+
Self {
18+
inner,
19+
kill_on_drop: true,
20+
}
21+
}
22+
23+
pub(crate) fn forget(&mut self) {
24+
self.kill_on_drop = false;
25+
}
26+
}
27+
28+
impl<T: Kill> Kill for ChildDropGuard<T> {
29+
fn kill(&mut self) -> io::Result<()> {
30+
let ret = self.inner.kill();
31+
32+
if ret.is_ok() {
33+
self.kill_on_drop = false;
34+
}
35+
36+
ret
37+
}
38+
}
39+
40+
impl<T: Kill> Drop for ChildDropGuard<T> {
41+
fn drop(&mut self) {
42+
if self.kill_on_drop {
43+
drop(self.kill());
44+
}
45+
}
46+
}
47+
48+
impl<T: Future + Kill + Unpin> Future for ChildDropGuard<T> {
49+
type Output = T::Output;
50+
51+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
52+
let ret = Pin::new(&mut self.inner).poll(cx);
53+
54+
if let Poll::Ready(_) = ret {
55+
// Avoid the overhead of trying to kill a reaped process
56+
self.kill_on_drop = false;
57+
}
58+
59+
ret
60+
}
61+
}
62+
63+
/// An interface for killing a running process.
64+
pub(crate) trait Kill {
65+
/// Forcefully kill the process.
66+
fn kill(&mut self) -> io::Result<()>;
67+
}
68+
69+
impl<'a, T: 'a + Kill> Kill for &'a mut T {
70+
fn kill(&mut self) -> io::Result<()> {
71+
(**self).kill()
72+
}
73+
}

0 commit comments

Comments
 (0)