Skip to content

Commit 354c38e

Browse files
committed
Add AsyncBufReadExt::read_line
1 parent 4025e79 commit 354c38e

File tree

5 files changed

+209
-6
lines changed

5 files changed

+209
-6
lines changed

futures-util/src/io/mod.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ pub use self::read::Read;
3030
mod read_exact;
3131
pub use self::read_exact::ReadExact;
3232

33-
// TODO
34-
// mod read_line;
35-
// pub use self::read_line::ReadLine;
33+
mod read_line;
34+
pub use self::read_line::ReadLine;
3635

3736
mod read_to_end;
3837
pub use self::read_to_end::ReadToEnd;
@@ -403,6 +402,65 @@ pub trait AsyncBufReadExt: AsyncBufRead {
403402
{
404403
ReadUntil::new(self, byte, buf)
405404
}
405+
406+
/// Creates a future which will read all the bytes associated with this I/O
407+
/// object into `buf` until a newline (the 0xA byte) or EOF is reached,
408+
/// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
409+
///
410+
/// This function will read bytes from the underlying stream until the
411+
/// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
412+
/// up to, and including, the delimiter (if found) will be appended to
413+
/// `buf`.
414+
///
415+
/// The returned future will resolve to the number of bytes read once the read
416+
/// operation is completed.
417+
///
418+
/// In the case of an error the buffer and the object will be discarded, with
419+
/// the error yielded.
420+
///
421+
/// # Errors
422+
///
423+
/// This function has the same error semantics as [`read_until`] and will
424+
/// also return an error if the read bytes are not valid UTF-8. If an I/O
425+
/// error is encountered then `buf` may contain some bytes already read in
426+
/// the event that all data read so far was valid UTF-8.
427+
///
428+
/// [`read_until`]: AsyncBufReadExt::read_until
429+
///
430+
/// # Examples
431+
///
432+
/// ```
433+
/// #![feature(async_await, await_macro)]
434+
/// # futures::executor::block_on(async {
435+
/// use futures::io::AsyncBufReadExt;
436+
/// use std::io::Cursor;
437+
///
438+
/// let mut cursor = Cursor::new(b"foo\nbar");
439+
/// let mut buf = String::new();
440+
///
441+
/// // cursor is at 'f'
442+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
443+
/// assert_eq!(num_bytes, 4);
444+
/// assert_eq!(buf, "foo\n");
445+
/// buf.clear();
446+
///
447+
/// // cursor is at 'b'
448+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
449+
/// assert_eq!(num_bytes, 3);
450+
/// assert_eq!(buf, "bar");
451+
/// buf.clear();
452+
///
453+
/// // cursor is at EOF
454+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
455+
/// assert_eq!(num_bytes, 0);
456+
/// assert_eq!(buf, "");
457+
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
458+
/// ```
459+
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
460+
where Self: Unpin,
461+
{
462+
ReadLine::new(self, buf)
463+
}
406464
}
407465

408466
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

futures-util/src/io/read_line.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::AsyncBufRead;
4+
use std::io;
5+
use std::mem;
6+
use std::pin::Pin;
7+
use std::str;
8+
use super::read_until::read_until_internal;
9+
10+
/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
11+
#[derive(Debug)]
12+
pub struct ReadLine<'a, R: ?Sized + Unpin> {
13+
reader: &'a mut R,
14+
buf: &'a mut String,
15+
bytes: Vec<u8>,
16+
read: usize,
17+
}
18+
19+
impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
20+
21+
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
22+
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
23+
Self {
24+
reader,
25+
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
26+
buf,
27+
read: 0,
28+
}
29+
}
30+
}
31+
32+
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
33+
type Output = io::Result<usize>;
34+
35+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36+
let Self { reader, buf, bytes, read } = &mut *self;
37+
let ret = ready!(read_until_internal(Pin::new(reader), b'\n', bytes, read, cx));
38+
if str::from_utf8(&bytes).is_err() {
39+
Poll::Ready(ret.and_then(|_| {
40+
Err(io::Error::new(io::ErrorKind::InvalidData,
41+
"stream did not contain valid UTF-8"))
42+
}))
43+
} else {
44+
unsafe { mem::swap(buf.as_mut_vec(), bytes); }
45+
Poll::Ready(ret)
46+
}
47+
}
48+
}

futures-util/src/io/read_until.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
2222
}
2323
}
2424

25-
fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
25+
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
2626
mut reader: Pin<&mut R>,
2727
byte: u8,
2828
buf: &mut Vec<u8>,

futures/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ pub mod io {
281281

282282
pub use futures_util::io::{
283283
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
284-
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
285-
Seek, Window, WriteAll, WriteHalf,
284+
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
285+
ReadUntil, Seek, Window, WriteAll, WriteHalf,
286286
};
287287
}
288288

futures/tests/io_read_line.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use futures::executor::block_on;
2+
use futures::future::Future;
3+
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt};
4+
use futures::task::{Context, Poll};
5+
use futures_test::task::noop_context;
6+
use std::cmp;
7+
use std::io::{self, Cursor};
8+
use std::pin::Pin;
9+
10+
#[test]
11+
fn read_line() {
12+
let mut buf = Cursor::new(&b"12"[..]);
13+
let mut v = String::new();
14+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
15+
assert_eq!(v, "12");
16+
17+
let mut buf = Cursor::new(&b"12\n\n"[..]);
18+
let mut v = String::new();
19+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
20+
assert_eq!(v, "12\n");
21+
v.clear();
22+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
23+
assert_eq!(v, "\n");
24+
v.clear();
25+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
26+
assert_eq!(v, "");
27+
}
28+
29+
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
30+
let mut cx = noop_context();
31+
loop {
32+
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) {
33+
return x;
34+
}
35+
}
36+
}
37+
38+
struct MaybePending<'a> {
39+
inner: &'a [u8],
40+
ready: bool,
41+
}
42+
43+
impl<'a> MaybePending<'a> {
44+
fn new(inner: &'a [u8]) -> Self {
45+
Self { inner, ready: false }
46+
}
47+
}
48+
49+
impl AsyncRead for MaybePending<'_> {
50+
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8])
51+
-> Poll<io::Result<usize>>
52+
{
53+
unimplemented!()
54+
}
55+
}
56+
57+
impl AsyncBufRead for MaybePending<'_> {
58+
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
59+
-> Poll<io::Result<&'a [u8]>>
60+
{
61+
if self.ready {
62+
self.ready = false;
63+
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
64+
let len = cmp::min(2, self.inner.len());
65+
Poll::Ready(Ok(&self.inner[0..len]))
66+
} else {
67+
self.ready = true;
68+
Poll::Pending
69+
}
70+
}
71+
72+
fn consume(mut self: Pin<&mut Self>, amt: usize) {
73+
self.inner = &self.inner[amt..];
74+
}
75+
}
76+
77+
#[test]
78+
fn maybe_ready() {
79+
let mut buf = MaybePending::new(&b"12"[..]);
80+
let mut v = String::new();
81+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
82+
assert_eq!(v, "12");
83+
84+
let mut buf = MaybePending::new(&b"12\n\n"[..]);
85+
let mut v = String::new();
86+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
87+
assert_eq!(v, "12\n");
88+
v.clear();
89+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
90+
assert_eq!(v, "\n");
91+
v.clear();
92+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
93+
assert_eq!(v, "");
94+
v.clear();
95+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
96+
assert_eq!(v, "");
97+
}

0 commit comments

Comments
 (0)