Skip to content

Commit 9f349d2

Browse files
committed
Add AsyncBufReadExt::read_line
1 parent 5a6b1b3 commit 9f349d2

File tree

5 files changed

+164
-24
lines changed

5 files changed

+164
-24
lines changed

futures-util/src/io/mod.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ pub use self::read::Read;
3030
mod read_exact;
3131
pub use self::read_exact::ReadExact;
3232

33-
// TODO
34-
// mod read_line;
35-
// pub use self::read_line::ReadLine;
33+
mod read_line;
34+
pub use self::read_line::ReadLine;
3635

3736
mod read_to_end;
3837
pub use self::read_to_end::ReadToEnd;
@@ -403,6 +402,65 @@ pub trait AsyncBufReadExt: AsyncBufRead {
403402
{
404403
ReadUntil::new(self, byte, buf)
405404
}
405+
406+
/// Creates a future which will read all the bytes associated with this I/O
407+
/// object into `buf` until a newline (the 0xA byte) or EOF is reached,
408+
/// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
409+
///
410+
/// This function will read bytes from the underlying stream until the
411+
/// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
412+
/// up to, and including, the delimiter (if found) will be appended to
413+
/// `buf`.
414+
///
415+
/// The returned future will resolve to the number of bytes read once the read
416+
/// operation is completed.
417+
///
418+
/// In the case of an error the buffer and the object will be discarded, with
419+
/// the error yielded.
420+
///
421+
/// # Errors
422+
///
423+
/// This function has the same error semantics as [`read_until`] and will
424+
/// also return an error if the read bytes are not valid UTF-8. If an I/O
425+
/// error is encountered then `buf` may contain some bytes already read in
426+
/// the event that all data read so far was valid UTF-8.
427+
///
428+
/// [`read_until`]: AsyncBufReadExt::read_until
429+
///
430+
/// # Examples
431+
///
432+
/// ```
433+
/// #![feature(async_await, await_macro)]
434+
/// # futures::executor::block_on(async {
435+
/// use futures::io::AsyncBufReadExt;
436+
/// use std::io::Cursor;
437+
///
438+
/// let mut cursor = Cursor::new(b"foo\nbar");
439+
/// let mut buf = String::new();
440+
///
441+
/// // cursor is at 'f'
442+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
443+
/// assert_eq!(num_bytes, 4);
444+
/// assert_eq!(buf, "foo\n");
445+
/// buf.clear();
446+
///
447+
/// // cursor is at 'b'
448+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
449+
/// assert_eq!(num_bytes, 3);
450+
/// assert_eq!(buf, "bar");
451+
/// buf.clear();
452+
///
453+
/// // cursor is at EOF
454+
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
455+
/// assert_eq!(num_bytes, 0);
456+
/// assert_eq!(buf, "");
457+
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
458+
/// ```
459+
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
460+
where Self: Unpin,
461+
{
462+
ReadLine::new(self, buf)
463+
}
406464
}
407465

408466
impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}

futures-util/src/io/read_line.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use futures_core::future::Future;
2+
use futures_core::task::{Context, Poll};
3+
use futures_io::AsyncBufRead;
4+
use std::io;
5+
use std::pin::Pin;
6+
use std::str;
7+
use super::read_until::read_until_internal;
8+
9+
/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
10+
#[derive(Debug)]
11+
pub struct ReadLine<'a, R: ?Sized + Unpin> {
12+
reader: &'a mut R,
13+
buf: &'a mut String,
14+
}
15+
16+
impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
17+
18+
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
19+
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
20+
Self { reader, buf }
21+
}
22+
}
23+
24+
struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
25+
26+
impl Drop for Guard<'_> {
27+
fn drop(&mut self) {
28+
unsafe { self.buf.set_len(self.len); }
29+
}
30+
}
31+
32+
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
33+
type Output = io::Result<usize>;
34+
35+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36+
let Self { reader, buf } = &mut *self;
37+
unsafe {
38+
let mut g = Guard { len: buf.len(), buf: buf.as_mut_vec() };
39+
let ret = ready!(read_until_internal(Pin::new(reader), b'\n', g.buf, cx));
40+
if str::from_utf8(&g.buf[g.len..]).is_err() {
41+
Poll::Ready(ret.and_then(|_| {
42+
Err(io::Error::new(io::ErrorKind::InvalidData,
43+
"stream did not contain valid UTF-8"))
44+
}))
45+
} else {
46+
g.len = g.buf.len();
47+
Poll::Ready(ret)
48+
}
49+
}
50+
}
51+
}

futures-util/src/io/read_until.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,37 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
2020
}
2121
}
2222

23+
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
24+
mut reader: Pin<&mut R>,
25+
byte: u8,
26+
buf: &mut Vec<u8>,
27+
cx: &mut Context<'_>,
28+
) -> Poll<io::Result<usize>> {
29+
let mut read = 0;
30+
loop {
31+
let (done, used) = {
32+
let available = try_ready!(reader.as_mut().poll_fill_buf(cx));
33+
if let Some(i) = memchr::memchr(byte, available) {
34+
buf.extend_from_slice(&available[..=i]);
35+
(true, i + 1)
36+
} else {
37+
buf.extend_from_slice(available);
38+
(false, available.len())
39+
}
40+
};
41+
reader.as_mut().consume(used);
42+
read += used;
43+
if done || used == 0 {
44+
return Poll::Ready(Ok(read));
45+
}
46+
}
47+
}
48+
2349
impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
2450
type Output = io::Result<usize>;
2551

2652
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27-
let this = &mut *self;
28-
let mut read = 0;
29-
loop {
30-
let (done, used) = {
31-
let available = try_ready!(Pin::new(&mut this.reader).poll_fill_buf(cx));
32-
if let Some(i) = memchr::memchr(this.byte, available) {
33-
this.buf.extend_from_slice(&available[..=i]);
34-
(true, i + 1)
35-
} else {
36-
this.buf.extend_from_slice(available);
37-
(false, available.len())
38-
}
39-
};
40-
Pin::new(&mut this.reader).consume(used);
41-
read += used;
42-
if done || used == 0 {
43-
return Poll::Ready(Ok(read));
44-
}
45-
}
53+
let Self { reader, byte, buf } = &mut *self;
54+
read_until_internal(Pin::new(reader), *byte, buf, cx)
4655
}
4756
}

futures/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ pub mod io {
273273
};
274274
pub use futures_util::io::{
275275
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
276-
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
277-
Seek, Window, WriteAll, WriteHalf,
276+
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
277+
ReadUntil, Seek, Window, WriteAll, WriteHalf,
278278
};
279279
}
280280

futures/tests/io_read_line.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use futures::executor::block_on;
2+
use futures::io::AsyncBufReadExt;
3+
use std::io::Cursor;
4+
5+
#[test]
6+
fn read_line() {
7+
let mut buf = Cursor::new(&b"12"[..]);
8+
let mut v = String::new();
9+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
10+
assert_eq!(v, "12");
11+
12+
let mut buf = Cursor::new(&b"12\n\n"[..]);
13+
let mut v = String::new();
14+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
15+
assert_eq!(v, "12\n");
16+
v.truncate(0);
17+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
18+
assert_eq!(v, "\n");
19+
v.truncate(0);
20+
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
21+
assert_eq!(v, "");
22+
}

0 commit comments

Comments
 (0)