Skip to content

Commit 76b10c4

Browse files
committed
FromStream for Option<T>
1 parent 33d2191 commit 76b10c4

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ cfg_if! {
6767

6868
mod vec;
6969
mod result;
70+
mod option;
7071
}
7172
}
7273

src/option/from_stream.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::pin::Pin;
2+
3+
use crate::prelude::*;
4+
use crate::stream::{FromStream, IntoStream};
5+
6+
impl<T, V> FromStream<Option<T>> for Option<V>
7+
where
8+
V: FromStream<T>,
9+
{
10+
/// Takes each element in the stream: if it is `None`, no further
11+
/// elements are taken, and `None` is returned. Should no `None`
12+
/// occur, a container with the values of each `Option` is returned.
13+
#[inline]
14+
fn from_stream<'a, S: IntoStream<Item = Option<T>>>(
15+
stream: S,
16+
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
17+
where
18+
<S as IntoStream>::IntoStream: 'a,
19+
{
20+
let stream = stream.into_stream();
21+
22+
Pin::from(Box::new(async move {
23+
pin_utils::pin_mut!(stream);
24+
25+
// Using `scan` here because it is able to stop the stream early
26+
// if a failure occurs
27+
let mut found_error = false;
28+
let out: V = stream
29+
.scan((), |_, elem| {
30+
match elem {
31+
Some(elem) => Some(elem),
32+
None => {
33+
found_error = true;
34+
// Stop processing the stream on error
35+
None
36+
}
37+
}
38+
})
39+
.collect()
40+
.await;
41+
42+
if found_error {
43+
None
44+
} else {
45+
Some(out)
46+
}
47+
}))
48+
}
49+
}

src/option/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//! The Rust core optional value type
2+
//!
3+
//! This module provides the `Option<T>` type for returning and
4+
//! propagating optional values.
5+
6+
mod from_stream;
7+
8+
#[doc(inline)]
9+
pub use std::option::Option;

0 commit comments

Comments
 (0)