-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(http2): add adaptive window size support using BDP #2138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
// What should it do? | ||
// | ||
// # BDP Algorithm | ||
// | ||
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: | ||
// 1a. Record current time. | ||
// 1b. Send a BDP ping. | ||
// 2. Increment the number of received bytes. | ||
// 3. When the BDP ping ack is received: | ||
// 3a. Record duration from sent time. | ||
// 3b. Merge RTT with a running average. | ||
// 3c. Calculate bdp as bytes/rtt. | ||
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. | ||
// | ||
// | ||
// # Implementation | ||
// | ||
// - `hyper::Body::h2` variant includes a "bdp channel" | ||
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())` | ||
// | ||
|
||
use std::sync::{Arc, Mutex, Weak}; | ||
use std::task::{self, Poll}; | ||
use std::time::{Duration, Instant}; | ||
|
||
use h2::{Ping, PingPong}; | ||
|
||
type WindowSize = u32; | ||
|
||
/// Any higher than this likely will be hitting the TCP flow control. | ||
const BDP_LIMIT: usize = 1024 * 1024 * 16; | ||
|
||
pub(crate) fn disabled() -> Sampler { | ||
Sampler { | ||
shared: Weak::new(), | ||
} | ||
} | ||
|
||
pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) { | ||
let shared = Arc::new(Mutex::new(Shared { | ||
bytes: 0, | ||
ping_pong, | ||
ping_sent: false, | ||
sent_at: Instant::now(), | ||
})); | ||
|
||
( | ||
Sampler { | ||
shared: Arc::downgrade(&shared), | ||
}, | ||
Estimator { | ||
bdp: initial_window, | ||
max_bandwidth: 0.0, | ||
shared, | ||
samples: 0, | ||
rtt: 0.0, | ||
}, | ||
) | ||
} | ||
|
||
#[derive(Clone)] | ||
pub(crate) struct Sampler { | ||
shared: Weak<Mutex<Shared>>, | ||
} | ||
|
||
pub(super) struct Estimator { | ||
shared: Arc<Mutex<Shared>>, | ||
|
||
/// Current BDP in bytes | ||
bdp: u32, | ||
/// Largest bandwidth we've seen so far. | ||
max_bandwidth: f64, | ||
/// Count of samples made (ping sent and received) | ||
samples: usize, | ||
/// Round trip time in seconds | ||
rtt: f64, | ||
} | ||
|
||
struct Shared { | ||
bytes: usize, | ||
ping_pong: PingPong, | ||
ping_sent: bool, | ||
sent_at: Instant, | ||
} | ||
|
||
impl Sampler { | ||
pub(crate) fn sample(&self, bytes: usize) { | ||
let shared = if let Some(shared) = self.shared.upgrade() { | ||
shared | ||
} else { | ||
return; | ||
}; | ||
|
||
let mut inner = shared.lock().unwrap(); | ||
|
||
if !inner.ping_sent { | ||
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) { | ||
inner.ping_sent = true; | ||
inner.sent_at = Instant::now(); | ||
trace!("sending BDP ping"); | ||
} else { | ||
return; | ||
} | ||
} | ||
|
||
inner.bytes += bytes; | ||
} | ||
|
||
pub(crate) fn is_enabled(&self) -> bool { | ||
self.shared.strong_count() > 0 | ||
} | ||
} | ||
|
||
impl Estimator { | ||
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> { | ||
let mut inner = self.shared.lock().unwrap(); | ||
if !inner.ping_sent { | ||
// XXX: this doesn't register a waker...? | ||
seanmonstar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Poll::Pending; | ||
} | ||
|
||
let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) { | ||
Ok(_pong) => { | ||
let rtt = inner.sent_at.elapsed(); | ||
let bytes = inner.bytes; | ||
inner.bytes = 0; | ||
inner.ping_sent = false; | ||
self.samples += 1; | ||
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); | ||
(bytes, rtt) | ||
} | ||
Err(e) => { | ||
debug!("bdp pong error: {}", e); | ||
return Poll::Pending; | ||
} | ||
}; | ||
|
||
drop(inner); | ||
|
||
if let Some(bdp) = self.calculate(bytes, rtt) { | ||
Poll::Ready(bdp) | ||
} else { | ||
// XXX: this doesn't register a waker...? | ||
Poll::Pending | ||
} | ||
} | ||
|
||
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { | ||
// No need to do any math if we're at the limit. | ||
if self.bdp as usize == BDP_LIMIT { | ||
return None; | ||
} | ||
|
||
// average the rtt | ||
let rtt = seconds(rtt); | ||
if self.samples < 10 { | ||
// Average the first 10 samples | ||
self.rtt += (rtt - self.rtt) / (self.samples as f64); | ||
} else { | ||
self.rtt += (rtt - self.rtt) / 0.9; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this trying to be an exponential moving average? Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I inversed it by accident. It's fixed in the latest release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah my bad, I didn't notice because I was using an old release. |
||
} | ||
|
||
// calculate the current bandwidth | ||
let bw = (bytes as f64) / (self.rtt * 1.5); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can somebody explain why 1.5? |
||
trace!("current bandwidth = {:.1}B/s", bw); | ||
|
||
if bw < self.max_bandwidth { | ||
// not a faster bandwidth, so don't update | ||
return None; | ||
} else { | ||
self.max_bandwidth = bw; | ||
} | ||
|
||
// if the current `bytes` sample is at least 2/3 the previous | ||
// bdp, increase to double the current sample. | ||
if (bytes as f64) >= (self.bdp as f64) * 0.66 { | ||
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; | ||
trace!("BDP increased to {}", self.bdp); | ||
Some(self.bdp) | ||
} else { | ||
None | ||
} | ||
} | ||
} | ||
|
||
fn seconds(dur: Duration) -> f64 { | ||
const NANOS_PER_SEC: f64 = 1_000_000_000.0; | ||
let secs = dur.as_secs() as f64; | ||
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC | ||
} |
Uh oh!
There was an error while loading. Please reload this page.