Skip to content

Fix uds listener hanging on accept #272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 commits merged into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,25 @@ impl UnixListener {
/// ```
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
future::poll_fn(|cx| {
let res =
futures_core::ready!(self.watcher.poll_read_with(cx, |inner| inner.accept_std()));
let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| {
match inner.accept_std() {
// Converting to `WouldBlock` so that the watcher will
// add the waker of this task to a list of readers.
Ok(None) => Err(io::ErrorKind::WouldBlock.into()),
res => res,
}
}));

match res? {
None => Poll::Pending,
Some((io, addr)) => {
let mio_stream = mio_uds::UnixStream::from_stream(io)?;
let stream = UnixStream {
watcher: Watcher::new(mio_stream),
};
Poll::Ready(Ok((stream, addr)))
}
// This should never happen since `None` is converted to `WouldBlock`
None => unreachable!(),
}
})
.await
Expand Down
57 changes: 56 additions & 1 deletion tests/uds.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#![cfg(unix)]

use async_std::io;
use async_std::os::unix::net::UnixDatagram;
use async_std::os::unix::net::{UnixDatagram, UnixListener, UnixStream};
use async_std::prelude::*;
use async_std::task;

use tempdir::TempDir;

use std::time::Duration;

const JULIUS_CAESAR: &[u8] = b"
Friends, Romans, countrymen - lend me your ears!
I come not to praise Caesar, but to bury him.
Expand Down Expand Up @@ -39,3 +44,53 @@ fn into_raw_fd() -> io::Result<()> {
Ok(())
})
}

const PING: &[u8] = b"ping";
const PONG: &[u8] = b"pong";
const TEST_TIMEOUT: Duration = Duration::from_secs(3);

#[test]
fn socket_ping_pong() {
let tmp_dir = TempDir::new("socket_ping_pong").expect("Temp dir not created");
let sock_path = tmp_dir.as_ref().join("sock");
let iter_cnt = 16;

let listener =
task::block_on(async { UnixListener::bind(&sock_path).await.expect("Socket bind") });

let server_handle = std::thread::spawn(move || {
task::block_on(async { ping_pong_server(listener, iter_cnt).await }).unwrap()
});

let client_handle = std::thread::spawn(move || {
task::block_on(async { ping_pong_client(&sock_path, iter_cnt).await }).unwrap()
});

client_handle.join().unwrap();
server_handle.join().unwrap();
}

async fn ping_pong_server(listener: UnixListener, iterations: u32) -> std::io::Result<()> {
let mut incoming = listener.incoming();
let mut buf = [0; 1024];
for _ix in 0..iterations {
if let Some(s) = incoming.next().await {
let mut s = s?;
let n = s.read(&mut buf[..]).await?;
assert_eq!(&buf[..n], PING);
s.write_all(&PONG).await?;
}
}
Ok(())
}

async fn ping_pong_client(socket: &std::path::PathBuf, iterations: u32) -> std::io::Result<()> {
let mut buf = [0; 1024];
for _ix in 0..iterations {
let mut socket = UnixStream::connect(&socket).await?;
socket.write_all(&PING).await?;
let n = async_std::io::timeout(TEST_TIMEOUT, socket.read(&mut buf[..])).await?;
assert_eq!(&buf[..n], PONG);
}
Ok(())
}