Skip to content

Cleanup stream module #506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! [`File`]s:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::fs::File;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand All @@ -47,9 +47,9 @@
//! coming from:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::SeekFrom;
//! use async_std::fs::File;
//! use async_std::io::SeekFrom;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down Expand Up @@ -82,9 +82,9 @@
//! methods to any reader:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand All @@ -104,9 +104,9 @@
//! to [`write`][`Write::write`]:
//!
//! ```no_run
//! use async_std::io::prelude::*;
//! use async_std::io::BufWriter;
//! use async_std::fs::File;
//! use async_std::io::BufWriter;
//! use async_std::io::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down Expand Up @@ -179,9 +179,9 @@
//! lines:
//!
//! ```no_run
//! use async_std::prelude::*;
//! use async_std::io::BufReader;
//! use async_std::fs::File;
//! use async_std::io::BufReader;
//! use async_std::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
Expand Down
6 changes: 3 additions & 3 deletions src/stream/extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ pub trait Extend<A> {
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub async fn extend<'a, C, A, T>(collection: &mut C, stream: T)
pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
where
C: Extend<A>,
T: IntoStream<Item = A> + 'a,
C: Extend<T>,
S: IntoStream<Item = T> + 'a,
{
Extend::extend(collection, stream).await
}
80 changes: 25 additions & 55 deletions src/stream/from_fn.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;

use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Debug)]
pub struct FromFn<F, Fut, T> {
f: F,
#[pin]
future: Option<Fut>,
__t: PhantomData<T>,
}
/// A stream that yields elements by calling a closure.
///
/// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
///
/// [`from_fn`]: fn.from_fn.html
#[derive(Clone, Debug)]
pub struct FromFn<F> {
f: F,
}

impl<F> Unpin for FromFn<F> {}

/// Creates a new stream where to produce each new element a provided closure is called.
///
/// This allows creating a custom stream with any behaviour without using the more verbose
Expand All @@ -34,21 +27,15 @@ pin_project! {
/// # async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::sync::{Arc, Mutex};
/// use async_std::stream;
///
/// let count = Arc::new(Mutex::new(0u8));
/// let mut count = 0u8;
/// let s = stream::from_fn(|| {
/// let count = Arc::clone(&count);
///
/// async move {
/// *count.lock().await += 1;
///
/// if *count.lock().await > 3 {
/// None
/// } else {
/// Some(*count.lock().await)
/// }
/// count += 1;
/// if count > 3 {
/// None
/// } else {
/// Some(count)
/// }
/// });
///
Expand All @@ -60,38 +47,21 @@ pin_project! {
/// #
/// # })
/// ```
pub fn from_fn<T, F, Fut>(f: F) -> FromFn<F, Fut, T>
pub fn from_fn<T, F>(f: F) -> FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
FromFn {
f,
future: None,
__t: PhantomData,
}
FromFn { f }
}

impl<F, Fut, T> Stream for FromFn<F, Fut, T>
impl<T, F> Stream for FromFn<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = Option<T>>,
F: FnMut() -> Option<T>,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let next =
futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));
this.future.set(None);

return Poll::Ready(next);
} else {
let fut = (this.f)();
this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(item)
}
}
2 changes: 1 addition & 1 deletion src/stream/from_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that created from iterator
/// A stream that was created from iterator.
///
/// This stream is created by the [`from_iter`] function.
/// See it documentation for more.
Expand Down
4 changes: 1 addition & 3 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ pub use from_iter::{from_iter, FromIter};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use repeat_with::{repeat_with, RepeatWith};
pub use stream::{
Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip,
};
pub use stream::*;

pub(crate) mod stream;

Expand Down
2 changes: 1 addition & 1 deletion src/stream/once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pin_project! {
/// documentation for more.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Once<T> {
value: Option<T>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
/// documentation for more.
///
/// [`repeat`]: fn.repeat.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Repeat<T> {
item: T,
}
Expand Down
79 changes: 31 additions & 48 deletions src/stream/repeat_with.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;

use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)]
pub struct RepeatWith<F, Fut, A> {
f: F,
#[pin]
future: Option<Fut>,
__a: PhantomData<A>,
}
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
///
/// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
///
/// [`repeat_with`]: fn.repeat_with.html
#[derive(Clone, Debug)]
pub struct RepeatWith<F> {
f: F,
}

impl<F> Unpin for RepeatWith<F> {}

/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure.
///
/// # Examples
Expand All @@ -35,7 +28,7 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1 });
/// let s = stream::repeat_with(|| 1);
///
/// pin_utils::pin_mut!(s);
///
Expand All @@ -54,48 +47,38 @@ pin_project! {
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat_with(|| async { 1u8 }).take(2);
/// let mut n = 1;
/// let s = stream::repeat_with(|| {
/// let item = n;
/// n *= 2;
/// item
/// })
/// .take(4);
///
/// pin_utils::pin_mut!(s);
///
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, Some(8));
/// assert_eq!(s.next().await, None);
/// # })
/// ```
pub fn repeat_with<F, Fut, A>(repeater: F) -> RepeatWith<F, Fut, A>
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
RepeatWith {
f: repeater,
future: None,
__a: PhantomData,
}
RepeatWith { f: repeater }
}

impl<F, Fut, A> Stream for RepeatWith<F, Fut, A>
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> Fut,
Fut: Future<Output = A>,
F: FnMut() -> T,
{
type Item = A;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if this.future.is_some() {
let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx));

this.future.set(None);

return Poll::Ready(Some(res));
} else {
let fut = (this.f)();
type Item = T;

this.future.set(Some(fut));
}
}
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
}
2 changes: 1 addition & 1 deletion src/stream/stream/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::prelude::*;
use crate::task::{Context, Poll};

pin_project! {
/// Chains two streams one after another.
/// A stream that chains two streams one after another.
///
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its
/// documentation for more.
Expand Down
1 change: 1 addition & 0 deletions src/stream/stream/cloned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

pin_project! {
/// A stream that clones the elements of an underlying stream.
#[derive(Debug)]
pub struct Cloned<S> {
#[pin]
Expand Down
4 changes: 2 additions & 2 deletions src/stream/stream/copied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
/// A stream that copies the elements of an underlying stream.
#[derive(Debug)]
pub struct Copied<S> {
#[pin]
stream: S,
Expand Down
Loading