Skip to content

Commit 7971002

Browse files
port windows impl and add missing drop guard
1 parent cbbf015 commit 7971002

File tree

7 files changed

+140
-88
lines changed

7 files changed

+140
-88
lines changed

Cargo.toml

+18-7
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ default = [
3434
"pin-project-lite",
3535
]
3636
docs = ["attributes", "unstable", "default"]
37-
unstable = ["std", "broadcaster"]
37+
unstable = [
38+
"std",
39+
"broadcaster",
40+
"crossbeam-queue",
41+
"async-signals",
42+
"libc",
43+
"lazy_static",
44+
"async-macros",
45+
]
3846
attributes = ["async-attributes"]
3947
std = [
4048
"alloc",
@@ -46,11 +54,7 @@ std = [
4654
"once_cell",
4755
"pin-utils",
4856
"slab",
49-
"crossbeam-queue",
50-
"async-signals",
51-
"libc",
52-
"lazy_static",
53-
"async-macros",
57+
5458
]
5559
alloc = [
5660
"futures-core/alloc",
@@ -78,11 +82,18 @@ pin-project-lite = { version = "0.1.2", optional = true }
7882
pin-utils = { version = "0.1.0-alpha.4", optional = true }
7983
slab = { version = "0.4.2", optional = true }
8084
crossbeam-queue = { version = "0.2.1", optional = true }
81-
async-signals = { version = "0.1.0", optional = true }
8285
lazy_static = { version = "1.4.0", optional = true }
8386
async-macros = { version = "2.0.0", optional = true }
87+
88+
[target.'cfg(not(target_os="windows"))'.dependencies]
89+
async-signals = { version = "0.1.0", optional = true }
8490
libc = { version = "0.2.67", optional = true }
8591

92+
[target.'cfg(target_os="windows")'.dependencies]
93+
winapi = { version = "0.3.8", features = ["threadpoollegacyapiset"] }
94+
mio-named-pipes = { version = "0.1.6" }
95+
futures-channel = "0.3.4"
96+
8697
[dev-dependencies]
8798
femme = "1.3.0"
8899
rand = "0.7.3"

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ cfg_default! {
270270
pub mod fs;
271271
pub mod path;
272272
pub mod net;
273-
pub mod process;
274273
}
275274

276275
cfg_unstable! {
@@ -280,6 +279,7 @@ cfg_unstable! {
280279
mod option;
281280
mod string;
282281
mod collections;
282+
pub mod process;
283283
}
284284

285285
cfg_unstable_default! {

src/process/kill.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::task::{Context, Poll};
77
/// A drop guard which ensures the child process is killed on drop to maintain
88
/// the contract of dropping a Future leads to "cancellation".
99
#[derive(Debug)]
10-
pub(crate) struct ChildDropGuard<T: Kill> {
11-
inner: T,
10+
pub(super) struct ChildDropGuard<T: Kill> {
11+
pub(super) inner: T,
1212
kill_on_drop: bool,
1313
}
1414

@@ -61,7 +61,7 @@ impl<T: Future + Kill + Unpin> Future for ChildDropGuard<T> {
6161
}
6262

6363
/// An interface for killing a running process.
64-
pub(crate) trait Kill {
64+
pub(super) trait Kill {
6565
/// Forcefully kill the process.
6666
fn kill(&mut self) -> io::Result<()>;
6767
}

src/process/mod.rs

+3-8
Original file line numberDiff line numberDiff line change
@@ -141,22 +141,17 @@ mod imp;
141141

142142
mod kill;
143143

144-
use kill::Kill;
144+
use kill::{ChildDropGuard, Kill};
145145

146146
/// Representation of a running or exited child process.
147147
///
148148
/// This structure is used to represent and manage child processes. A child
149149
/// process is created via the [`Command`] struct, which configures the
150150
/// spawning process and can itself be constructed using a builder-style
151151
/// interface.
152-
///
153-
/// There is no implementation of [`Drop`] for child processes,
154-
/// so if you do not ensure the `Child` has exited then it will continue to
155-
/// run, even after the `Child` handle to the child process has gone out of
156-
/// scope.
157152
#[derive(Debug)]
158153
pub struct Child {
159-
child: imp::Child,
154+
child: ChildDropGuard<imp::Child>,
160155

161156
/// The handle for writing to the child's standard input (stdin), if it has
162157
/// been captured.
@@ -172,7 +167,7 @@ pub struct Child {
172167
impl Child {
173168
/// Returns the OS-assigned process identifier associated with this child.
174169
pub fn id(&self) -> u32 {
175-
self.child.id()
170+
self.child.inner.id()
176171
}
177172

178173
/// Forces the child process to exit. If the child has already exited, an [`InvalidInput`]

src/process/unix/mod.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ use self::{
4444
use crate::net::driver::Watcher;
4545
use crate::prelude::*;
4646
use crate::process::{
47-
kill::Kill, Child as SpawnedChild, ChildStderr as SpawnedChildStderr,
48-
ChildStdin as SpawnedChildStdin, ChildStdout as SpawnedChildStdout, ExitStatus,
47+
kill::{ChildDropGuard, Kill},
48+
Child as SpawnedChild, ChildStderr as SpawnedChildStderr, ChildStdin as SpawnedChildStdin,
49+
ChildStdout as SpawnedChildStdout, ExitStatus,
4950
};
5051

5152
lazy_static::lazy_static! {
@@ -111,9 +112,9 @@ pub(crate) fn spawn_child(cmd: &mut process::Command) -> io::Result<SpawnedChild
111112

112113
let signal = Signals::new(&[libc::SIGCHLD])?;
113114
Ok(SpawnedChild {
114-
child: Child {
115+
child: ChildDropGuard::new(Child {
115116
inner: Reaper::new(child, GlobalOrphanQueue, signal),
116-
},
117+
}),
117118
stdin: stdin.map(|stdin| SpawnedChildStdin { inner: stdin }),
118119
stdout: stdout.map(|stdout| SpawnedChildStdout { inner: stdout }),
119120
stderr: stderr.map(|stderr| SpawnedChildStderr { inner: stderr }),

src/process/windows/mod.rs

+109-64
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Windows asynchronous process handling.
1+
//! Windows asynchronous process handling.ven
22
//!
33
//! Like with Unix we don't actually have a way of registering a process with an
44
//! IOCP object. As a result we similarly need another mechanism for getting a
@@ -15,31 +15,34 @@
1515
//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
1616
//! from then on out.
1717
18-
extern crate winapi;
19-
extern crate mio_named_pipes;
20-
2118
use std::fmt;
22-
use std::io;
19+
use std::future::Future;
20+
use std::io::{self, Read, Write};
2321
use std::os::windows::prelude::*;
2422
use std::os::windows::process::ExitStatusExt;
25-
use std::process::{self, ExitStatus};
23+
use std::pin::Pin;
24+
use std::process;
2625
use std::ptr;
27-
28-
use futures::future::Fuse;
29-
use futures::sync::oneshot;
30-
use futures::{Future, Poll, Async};
31-
use kill::Kill;
32-
use self::mio_named_pipes::NamedPipe;
33-
use self::winapi::shared::minwindef::*;
34-
use self::winapi::shared::winerror::*;
35-
use self::winapi::um::handleapi::*;
36-
use self::winapi::um::processthreadsapi::*;
37-
use self::winapi::um::synchapi::*;
38-
use self::winapi::um::threadpoollegacyapiset::*;
39-
use self::winapi::um::winbase::*;
40-
use self::winapi::um::winnt::*;
41-
use super::SpawnedChild;
42-
use tokio_reactor::{Handle, PollEvented};
26+
use std::task::{Context, Poll};
27+
28+
use futures_channel::oneshot::{channel as oneshot, Receiver, Sender};
29+
use futures_io::{AsyncRead, AsyncWrite};
30+
use mio_named_pipes::NamedPipe;
31+
use winapi::shared::minwindef::*;
32+
use winapi::shared::winerror::*;
33+
use winapi::um::handleapi::*;
34+
use winapi::um::processthreadsapi::*;
35+
use winapi::um::synchapi::*;
36+
use winapi::um::threadpoollegacyapiset::*;
37+
use winapi::um::winbase::*;
38+
use winapi::um::winnt::*;
39+
40+
use crate::net::driver::Watcher;
41+
use crate::process::{
42+
kill::{ChildDropGuard, Kill},
43+
Child as SpawnedChild, ChildStderr as SpawnedChildStderr, ChildStdin as SpawnedChildStdin,
44+
ChildStdout as SpawnedChildStdout, ExitStatus,
45+
};
4346

4447
#[must_use = "futures do nothing unless polled"]
4548
pub struct Child {
@@ -48,7 +51,7 @@ pub struct Child {
4851
}
4952

5053
impl fmt::Debug for Child {
51-
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
54+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
5255
fmt.debug_struct("Child")
5356
.field("pid", &self.id())
5457
.field("child", &self.child)
@@ -58,28 +61,28 @@ impl fmt::Debug for Child {
5861
}
5962

6063
struct Waiting {
61-
rx: Fuse<oneshot::Receiver<()>>,
64+
rx: Receiver<()>,
6265
wait_object: HANDLE,
63-
tx: *mut Option<oneshot::Sender<()>>,
66+
tx: *mut Option<Sender<()>>,
6467
}
6568

6669
unsafe impl Sync for Waiting {}
6770
unsafe impl Send for Waiting {}
6871

69-
pub(crate) fn spawn_child(cmd: &mut process::Command, handle: &Handle) -> io::Result<SpawnedChild> {
72+
pub(crate) fn spawn_child(cmd: &mut process::Command) -> io::Result<SpawnedChild> {
7073
let mut child = cmd.spawn()?;
71-
let stdin = stdio(child.stdin.take(), handle)?;
72-
let stdout = stdio(child.stdout.take(), handle)?;
73-
let stderr = stdio(child.stderr.take(), handle)?;
74+
let stdin = stdio(child.stdin.take())?;
75+
let stdout = stdio(child.stdout.take())?;
76+
let stderr = stdio(child.stderr.take())?;
7477

7578
Ok(SpawnedChild {
76-
child: Child {
79+
child: ChildDropGuard::new(Child {
7780
child,
7881
waiting: None,
79-
},
80-
stdin,
81-
stdout,
82-
stderr,
82+
}),
83+
stdin: stdin.map(|stdin| SpawnedChildStdin { inner: stdin }),
84+
stdout: stdout.map(|stdout| SpawnedChildStdout { inner: stdout }),
85+
stderr: stderr.map(|stderr| SpawnedChildStderr { inner: stderr }),
8386
})
8487
}
8588

@@ -96,42 +99,47 @@ impl Kill for Child {
9699
}
97100

98101
impl Future for Child {
99-
type Item = ExitStatus;
100-
type Error = io::Error;
102+
type Output = io::Result<ExitStatus>;
101103

102-
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
104+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<<Self as Future>::Output> {
103105
loop {
104106
if let Some(ref mut w) = self.waiting {
105-
match w.rx.poll().expect("should not be canceled") {
106-
Async::Ready(()) => {}
107-
Async::NotReady => return Ok(Async::NotReady),
107+
match Pin::new(&mut w.rx).poll(cx) {
108+
Poll::Ready(Ok(())) => {}
109+
Poll::Ready(Err(_)) => panic!("cancellation was not supposed to happen"),
110+
Poll::Pending => return Poll::Pending,
108111
}
109-
let status = try!(try_wait(&self.child)).expect("not ready yet");
110-
return Ok(status.into())
112+
return match try_wait(&self.child) {
113+
Ok(status) => Poll::Ready(Ok(status.expect("not ready yet"))),
114+
Err(err) => Poll::Ready(Err(err)),
115+
};
111116
}
112117

113-
if let Some(e) = try!(try_wait(&self.child)) {
114-
return Ok(e.into())
118+
match try_wait(&self.child) {
119+
Ok(Some(e)) => return Poll::Ready(Ok(e)),
120+
Ok(None) => {}
121+
Err(err) => return Poll::Ready(Err(err)),
115122
}
116-
let (tx, rx) = oneshot::channel();
123+
let (tx, rx) = oneshot();
117124
let ptr = Box::into_raw(Box::new(Some(tx)));
118125
let mut wait_object = ptr::null_mut();
119126
let rc = unsafe {
120-
RegisterWaitForSingleObject(&mut wait_object,
121-
self.child.as_raw_handle(),
122-
Some(callback),
123-
ptr as *mut _,
124-
INFINITE,
125-
WT_EXECUTEINWAITTHREAD |
126-
WT_EXECUTEONLYONCE)
127+
RegisterWaitForSingleObject(
128+
&mut wait_object,
129+
self.child.as_raw_handle(),
130+
Some(callback),
131+
ptr as *mut _,
132+
INFINITE,
133+
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
134+
)
127135
};
128136
if rc == 0 {
129137
let err = io::Error::last_os_error();
130138
drop(unsafe { Box::from_raw(ptr) });
131-
return Err(err)
139+
return Poll::Ready(Err(err));
132140
}
133141
self.waiting = Some(Waiting {
134-
rx: rx.fuse(),
142+
rx,
135143
wait_object,
136144
tx: ptr,
137145
});
@@ -151,9 +159,8 @@ impl Drop for Waiting {
151159
}
152160
}
153161

154-
unsafe extern "system" fn callback(ptr: PVOID,
155-
_timer_fired: BOOLEAN) {
156-
let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>);
162+
unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
163+
let complete = &mut *(ptr as *mut Option<Sender<()>>);
157164
let _ = complete.take().unwrap().send(());
158165
}
159166

@@ -174,19 +181,57 @@ pub fn try_wait(child: &process::Child) -> io::Result<Option<ExitStatus>> {
174181
}
175182
}
176183

177-
pub type ChildStdin = PollEvented<NamedPipe>;
178-
pub type ChildStdout = PollEvented<NamedPipe>;
179-
pub type ChildStderr = PollEvented<NamedPipe>;
184+
impl AsyncWrite for SpawnedChildStdin {
185+
fn poll_write(
186+
mut self: Pin<&mut Self>,
187+
cx: &mut Context<'_>,
188+
buf: &[u8],
189+
) -> Poll<io::Result<usize>> {
190+
self.inner.poll_write_with_mut(cx, |inner| inner.write(buf))
191+
}
192+
193+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
194+
self.inner.poll_write_with_mut(cx, |inner| inner.flush())
195+
}
196+
197+
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
198+
Poll::Ready(Ok(()))
199+
}
200+
}
201+
202+
impl AsyncRead for SpawnedChildStdout {
203+
fn poll_read(
204+
mut self: Pin<&mut Self>,
205+
cx: &mut Context<'_>,
206+
buf: &mut [u8],
207+
) -> Poll<io::Result<usize>> {
208+
self.inner.poll_read_with_mut(cx, |inner| inner.read(buf))
209+
}
210+
}
211+
212+
impl AsyncRead for SpawnedChildStderr {
213+
fn poll_read(
214+
mut self: Pin<&mut Self>,
215+
cx: &mut Context<'_>,
216+
buf: &mut [u8],
217+
) -> Poll<io::Result<usize>> {
218+
self.inner.poll_read_with_mut(cx, |inner| inner.read(buf))
219+
}
220+
}
221+
222+
pub type ChildStdin = Watcher<NamedPipe>;
223+
pub type ChildStdout = Watcher<NamedPipe>;
224+
pub type ChildStderr = Watcher<NamedPipe>;
180225

181-
fn stdio<T>(option: Option<T>, handle: &Handle)
182-
-> io::Result<Option<PollEvented<NamedPipe>>>
183-
where T: IntoRawHandle,
226+
fn stdio<T>(option: Option<T>) -> io::Result<Option<Watcher<NamedPipe>>>
227+
where
228+
T: IntoRawHandle,
184229
{
185230
let io = match option {
186231
Some(io) => io,
187232
None => return Ok(None),
188233
};
189234
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
190-
let io = try!(PollEvented::new_with_handle(pipe, handle));
235+
let io = Watcher::new(pipe);
191236
Ok(Some(io))
192237
}

0 commit comments

Comments
 (0)