Skip to content

make StreamExt::timeout(d).next() behave like future::timeout(d, s.next()) #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Dec 4, 2020
Merged

Conversation

ghost
Copy link

@ghost ghost commented Mar 20, 2020

This PR is to make s.timeout(d).next() behave like future::timeout(d, s.next()).

To illustrate the difference:

let mut s = stream::from_iter(0..2).chain(stream::pending()).timeout(Duration::from_secs(1));
let time = Instant::now();
for _ in 0..5 {
    let v = s.next().await;
    println!("{}ms {:?}", time.elapsed().as_millis(), v);
}
// output:
//     0ms Some(Ok(0))
//     0ms Some(Ok(1))
//     0ms Some(Ok(2))
//     1000ms Some(Err(TimeoutError { _private: () }))
//     1000ms Some(Err(TimeoutError { _private: () }))


let mut s = stream::from_iter(0..3).chain(stream::pending());
let time = Instant::now();
for _ in 0..5 {
    let v = future::timeout(s.next(), Duration::from_secs(1)).await;
    println!("{}ms {:?}", time.elapsed().as_millis(), v);
}
// output:
//     0ms Some(Ok(0))
//     0ms Some(Ok(1))
//     0ms Some(Ok(2))
//     1000ms Some(Err(TimeoutError { _private: () }))
//     2001ms Some(Err(TimeoutError { _private: () }))

@feuerste
Copy link

feuerste commented Apr 3, 2020

Eagerly waiting for this to be merged :)

@feuerste
Copy link

feuerste commented Apr 6, 2020

@yoshuawuyts @k-nasa @stjepang Can you please have a quick look?
I actually ran into this when porting timeout of tokio's StreamExt to async-std's, as tokio's default one behaves like timeout_repeat....

@feuerste
Copy link

@dignifiedquire Would you mind reviewing this PR? Thanks a lot!

@dignifiedquire
Copy link
Member

I can see the use for this but not entirely sure. Pinging @yoshuawuyts for feedback

@yoshuawuyts
Copy link
Contributor

Could you maybe share more on what this does, what the difference with the other method is, and why it would make sense to have both?

@ghost
Copy link
Author

ghost commented May 8, 2020

await a timeout-ed timeout-stream again will immediately get an error, a timeout_repeat-stream will wait another duration. timeout_repeat is more convenient to detect accidentally stream-freezing event.

let mut s = stream::from_iter(0..3).chain(stream::pending()).timeout(Duration::from_secs(1));
let time = Instant::now();
for _ in 0..5 {
    let v = s.next().await;
    println!("{}ms {:?}", time.elapsed().as_millis(), v);
}
// output:
//     0ms Some(Ok(0))
//     0ms Some(Ok(1))
//     0ms Some(Ok(2))
//     1000ms Some(Err(TimeoutError { _private: () }))
//     1000ms Some(Err(TimeoutError { _private: () }))

let mut s = stream::from_iter(0..3).chain(stream::pending()).timeout_repeat(Duration::from_secs(1));
let time = Instant::now();
for _ in 0..5 {
    let v = s.next().await;
    println!("{}ms {:?}", time.elapsed().as_millis(), v);
}
// output:
//     0ms Some(Ok(0))
//     0ms Some(Ok(1))
//     0ms Some(Ok(2))
//     1000ms Some(Err(TimeoutError { _private: () }))
//     2001ms Some(Err(TimeoutError { _private: () }))

@feuerste
Copy link

feuerste commented May 8, 2020

To add on what @hhggit already wrote, the default behavior of tokio's StreamExt timeout() method is actually equal to the behaviour of timeout_repeat(), see also https://github.com/tokio-rs/tokio/blob/236629d1be7208612cbe5388e7ffebf85b73c157/tokio/src/stream/mod.rs#L755-L817 and https://github.com/tokio-rs/tokio/blob/236629d1be7208612cbe5388e7ffebf85b73c157/tokio/src/stream/timeout.rs#L37-L60
Timing out each stream element is much more convenient to use in some situations than timing out the whole stream completion. Imagine you download/upload huge vs. small files. Then you possibly need different timeout values when using timeout() depending on the file size, but you can keep the same timeout value when using timeout_repeat(), as the timeout happens on file chunks rather than the whole file.

@yoshuawuyts
Copy link
Contributor

I see. Thanks for clarifying.

I'm still confused about the motivating use case — to the best of my understanding it is: "We would like to detect timeouts, and even in the face of timeouts we would like to continue streaming." I'm not sure when this behavior would make sense.

The Stream::timeout method was never designed with repeated timeouts in mind. The driving idea was that if a timeout occurs you'll want to abort. But if that's not flexible enough we can still change it to reset the timeout when polled again. I believe this would match the behavior proposed in this patch.

However I don't think we should have both methods since it would appear a single method could cover both uses.

@ghost
Copy link
Author

ghost commented May 8, 2020

...even in the face of timeouts we would like to continue streaming." I'm not sure when this behavior would make sense.

For example, when client stops making request for sometime, a server would try to send a heartbeat to test if client is freezing or just idling.

@feuerste
Copy link

feuerste commented May 8, 2020

I'm fine with changing the default behaviour to match tokio's by resetting when polling again and only have one method.

@yoshuawuyts
Copy link
Contributor

I'm fine with changing the default behaviour to match tokio's by resetting when polling again and only have one method.

This is indeed the change I think needs to happen.

@ghost
Copy link
Author

ghost commented Nov 27, 2020

I'm fine with changing the default behaviour to match tokio's by resetting when polling again and only have one method.

This is indeed the change I think needs to happen.

@yoshuawuyts updated

@Fishrock123
Copy link
Member

I feel like the new changes are how I would expect this to work, but maybe not how most people would expect this to work. If we are going this direction there may be room for something like timeout_absolute or something too, since sometimes you may want the original behavior, depending on what your program does.

(e.g. if you're contacting a foreign api over the network you may want to timeout even if it is sending tcp hearbeats.)

@ghost ghost changed the title add StreamExt::timeout_repeat make StreamExt::timeout(d).next() behave like future::timeout(s.next(), d) Nov 28, 2020
@ghost ghost changed the title make StreamExt::timeout(d).next() behave like future::timeout(s.next(), d) make StreamExt::timeout(d).next() behave like future::timeout(d, s.next()) Nov 28, 2020
Copy link
Contributor

@yoshuawuyts yoshuawuyts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good; thanks so much!

@yoshuawuyts yoshuawuyts merged commit 415d0d1 into async-rs:master Dec 4, 2020
@ghost ghost deleted the timeout_repeat branch December 5, 2020 11:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants