Skip to content

Commit 8d77d6f

Browse files
taiki-ecramertj
authored andcommitted
Add AsyncBufReadExt::read_line
1 parent b93b785 commit 8d77d6f

File tree

6 files changed

+178
-53
lines changed

6 files changed

+178
-53
lines changed

futures-util/src/io/mod.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ pub use self::read::Read;
4040
mod read_exact;
4141
pub use self::read_exact::ReadExact;
4242

43-
// TODO
44-
// mod read_line;
45-
// pub use self::read_line::ReadLine;
43+
mod read_line;
44+
pub use self::read_line::ReadLine;
4645

4746
mod read_to_end;
4847
pub use self::read_to_end::ReadToEnd;
@@ -413,6 +412,65 @@ pub trait AsyncBufReadExt: AsyncBufRead {
413412
{
414413
ReadUntil::new(self, byte, buf)
415414
}
415+
416+
/// Creates a future which will read all the bytes associated with this I/O
417+
/// object into `buf` until a newline (the 0xA byte) or EOF is reached,
418+
/// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
419+
///
420+
/// This function will read bytes from the underlying stream until the
421+
/// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
422+
/// up to, and including, the delimiter (if found) will be appended to
423+
/// `buf`.
424+
///
425+
/// The returned future will resolve to the number of bytes read once the read
426+
/// operation is completed.
427+
///
428+
/// In the case of an error the buffer and the object will be discarded, with
429+
/// the error yielded.
430+
///
431+
/// # Errors
432+
///
433+
/// This function has the same error semantics as [`read_until`] and will
434+
/// also return an error if the read bytes are not valid UTF-8. If an I/O
435+
/// error is encountered then `buf` may contain some bytes already read in
436+
/// the event that all data read so far was valid UTF-8.
437+
///
438+
/// [`read_until`]: AsyncBufReadExt::read_until
439+
///
440+
/// # Examples
441+
///
442+
/// ```
443+
/// #![feature(async_await, await_macro)]
444+
/// # futures::executor::block_on(async {
445+
/// use futures::io::AsyncBufReadExt;
446+
/// use std::io::Cursor;
447+
///
448+
/// let mut cursor = Cursor::new(b"foo\nbar");
449+
/// let mut buf = String::new();
450+
///
451+
/// // cursor is at 'f'
452+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
453+
/// assert_eq!(num_bytes, 4);
454+
/// assert_eq!(buf, "foo\n");
455+
/// buf.clear();
456+
///
457+
/// // cursor is at 'b'
458+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
459+
/// assert_eq!(num_bytes, 3);
460+
/// assert_eq!(buf, "bar");
461+
/// buf.clear();
462+
///
463+
/// // cursor is at EOF
464+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
465+
/// assert_eq!(num_bytes, 0);
466+
/// assert_eq!(buf, "");
467+
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
468+
/// ```
469+
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
470+
where Self: Unpin,
471+
{
472+
ReadLine::new(self, buf)
473+
}
416474
}
417475

418476
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

futures-util/src/io/read_line.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::AsyncBufRead;
4+
use std::io;
5+
use std::mem;
6+
use std::pin::Pin;
7+
use std::str;
8+
use super::read_until::read_until_internal;
9+
10+
/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
11+
#[derive(Debug)]
12+
pub struct ReadLine<'a, R: ?Sized + Unpin> {
13+
reader: &'a mut R,
14+
buf: &'a mut String,
15+
bytes: Vec<u8>,
16+
read: usize,
17+
}
18+
19+
impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
20+
21+
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
22+
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
23+
Self {
24+
reader,
25+
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
26+
buf,
27+
read: 0,
28+
}
29+
}
30+
}
31+
32+
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
33+
type Output = io::Result<usize>;
34+
35+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36+
let Self { reader, buf, bytes, read } = &mut *self;
37+
let ret = ready!(read_until_internal(Pin::new(reader), b'\n', bytes, read, cx));
38+
if str::from_utf8(&bytes).is_err() {
39+
Poll::Ready(ret.and_then(|_| {
40+
Err(io::Error::new(io::ErrorKind::InvalidData,
41+
"stream did not contain valid UTF-8"))
42+
}))
43+
} else {
44+
unsafe { mem::swap(buf.as_mut_vec(), bytes); }
45+
Poll::Ready(ret)
46+
}
47+
}
48+
}

futures-util/src/io/read_until.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
2222
}
2323
}
2424

25-
fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
25+
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
2626
mut reader: Pin<&mut R>,
2727
byte: u8,
2828
buf: &mut Vec<u8>,

futures/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ pub mod io {
281281

282282
pub use futures_util::io::{
283283
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader,
284-
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
285-
Seek, Window, WriteAll, WriteHalf,
284+
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
285+
ReadUntil, Seek, Window, WriteAll, WriteHalf,
286286
};
287287
}
288288

futures/tests/io_read_line.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use futures::executor::block_on;
2+
use futures::future::Future;
3+
use futures::io::AsyncBufReadExt;
4+
use futures::task::Poll;
5+
use futures_test::io::AsyncReadTestExt;
6+
use futures_test::task::noop_context;
7+
use std::io::Cursor;
8+
use std::pin::Pin;
9+
10+
#[test]
11+
fn read_line() {
12+
let mut buf = Cursor::new(b"12");
13+
let mut v = String::new();
14+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
15+
assert_eq!(v, "12");
16+
17+
let mut buf = Cursor::new(b"12\n\n");
18+
let mut v = String::new();
19+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
20+
assert_eq!(v, "12\n");
21+
v.clear();
22+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
23+
assert_eq!(v, "\n");
24+
v.clear();
25+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
26+
assert_eq!(v, "");
27+
}
28+
29+
fn run<F: Future + Unpin>(mut f: F) -> F::Output {
30+
let mut cx = noop_context();
31+
loop {
32+
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) {
33+
return x;
34+
}
35+
}
36+
}
37+
38+
#[test]
39+
fn maybe_pending() {
40+
let mut buf = b"12".interleave_pending();
41+
let mut v = String::new();
42+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
43+
assert_eq!(v, "12");
44+
45+
let mut buf = b"12\n\n".interleave_pending();
46+
let mut v = String::new();
47+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
48+
assert_eq!(v, "12\n");
49+
v.clear();
50+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
51+
assert_eq!(v, "\n");
52+
v.clear();
53+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
54+
assert_eq!(v, "");
55+
v.clear();
56+
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
57+
assert_eq!(v, "");
58+
}

futures/tests/io_read_until.rs

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
use futures::executor::block_on;
22
use futures::future::Future;
3-
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt};
4-
use futures::task::{Context, Poll};
3+
use futures::io::AsyncBufReadExt;
4+
use futures::task::Poll;
5+
use futures_test::io::AsyncReadTestExt;
56
use futures_test::task::noop_context;
6-
use std::cmp;
7-
use std::io::{self, Cursor};
7+
use std::io::Cursor;
88
use std::pin::Pin;
99

1010
#[test]
1111
fn read_until() {
12-
let mut buf = Cursor::new(&b"12"[..]);
12+
let mut buf = Cursor::new(b"12");
1313
let mut v = Vec::new();
1414
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
1515
assert_eq!(v, b"12");
1616

17-
let mut buf = Cursor::new(&b"1233"[..]);
17+
let mut buf = Cursor::new(b"1233");
1818
let mut v = Vec::new();
1919
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3);
2020
assert_eq!(v, b"123");
@@ -35,53 +35,14 @@ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
3535
}
3636
}
3737

38-
struct MaybePending<'a> {
39-
inner: &'a [u8],
40-
ready: bool,
41-
}
42-
43-
impl<'a> MaybePending<'a> {
44-
fn new(inner: &'a [u8]) -> Self {
45-
Self { inner, ready: false }
46-
}
47-
}
48-
49-
impl AsyncRead for MaybePending<'_> {
50-
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8])
51-
-> Poll<io::Result<usize>>
52-
{
53-
unimplemented!()
54-
}
55-
}
56-
57-
impl AsyncBufRead for MaybePending<'_> {
58-
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
59-
-> Poll<io::Result<&'a [u8]>>
60-
{
61-
if self.ready {
62-
self.ready = false;
63-
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
64-
let len = cmp::min(2, self.inner.len());
65-
Poll::Ready(Ok(&self.inner[0..len]))
66-
} else {
67-
self.ready = true;
68-
Poll::Pending
69-
}
70-
}
71-
72-
fn consume(mut self: Pin<&mut Self>, amt: usize) {
73-
self.inner = &self.inner[amt..];
74-
}
75-
}
76-
7738
#[test]
7839
fn maybe_pending() {
79-
let mut buf = MaybePending::new(b"12");
40+
let mut buf = b"12".interleave_pending();
8041
let mut v = Vec::new();
8142
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
8243
assert_eq!(v, b"12");
8344

84-
let mut buf = MaybePending::new(b"12333");
45+
let mut buf = b"12333".interleave_pending();
8546
let mut v = Vec::new();
8647
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3);
8748
assert_eq!(v, b"123");

0 commit comments

Comments
 (0)