Skip to content

Commit aeacbeb

Browse files
committed
feat(http2): add adaptive window size support using BDP
This adds support for calculating the Bandwidth-delay product when using HTTP2. When a DATA frame is received, a PING is sent to the remote. While the PING acknoledgement is outstanding, the amount of bytes of all received DATA frames is accumulated. Once we receive the PING acknowledgement, we calculate the BDP based on the number of received bytes and the round-trip-time of the PING. If we are near the current maximum window size, the size is doubled. This currently only adds support to the `Client`. It's disabled by default until tested more extensively.
1 parent 22dc6fe commit aeacbeb

File tree

7 files changed

+311
-41
lines changed

7 files changed

+311
-41
lines changed

src/body/body.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use http_body::{Body as HttpBody, SizeHint};
1313

1414
use crate::common::{task, watch, Future, Never, Pin, Poll};
1515
use crate::proto::DecodedLength;
16+
use crate::proto::h2::bdp;
1617
use crate::upgrade::OnUpgrade;
1718

1819
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
@@ -61,6 +62,11 @@ struct Extra {
6162
/// connection yet.
6263
delayed_eof: Option<DelayEof>,
6364
on_upgrade: OnUpgrade,
65+
66+
/// Records bytes read to compute the BDP.
67+
///
68+
/// Only used with `H2` variant.
69+
h2_bdp: bdp::Sampler,
6470
}
6571

6672
type DelayEofUntil = oneshot::Receiver<Never>;
@@ -175,11 +181,17 @@ impl Body {
175181
Body { kind, extra: None }
176182
}
177183

178-
pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self {
179-
Body::new(Kind::H2 {
184+
pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength, bdp: bdp::Sampler) -> Self {
185+
let mut body = Body::new(Kind::H2 {
180186
content_length,
181187
recv,
182-
})
188+
});
189+
190+
if bdp.is_enabled() {
191+
body.extra_mut().h2_bdp = bdp;
192+
}
193+
194+
body
183195
}
184196

185197
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
@@ -204,6 +216,7 @@ impl Body {
204216
Box::new(Extra {
205217
delayed_eof: None,
206218
on_upgrade: OnUpgrade::none(),
219+
h2_bdp: bdp::disabled(),
207220
})
208221
})
209222
}
@@ -262,6 +275,9 @@ impl Body {
262275
Some(Ok(bytes)) => {
263276
let _ = h2.flow_control().release_capacity(bytes.len());
264277
len.sub_if(bytes.len() as u64);
278+
if let Some(ref extra) = self.extra {
279+
extra.h2_bdp.sample(bytes.len());
280+
}
265281
Poll::Ready(Some(Ok(bytes)))
266282
}
267283
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),

src/client/conn.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@ where
3535
H2(#[pin] proto::h2::ClientTask<B>),
3636
}
3737

38-
// Our defaults are chosen for the "majority" case, which usually are not
39-
// resource constrained, and so the spec default of 64kb can be too limiting
40-
// for performance.
41-
const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
42-
const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
43-
4438
/// Returns a handshake future over some IO.
4539
///
4640
/// This is a shortcut for `Builder::new().handshake(io)`.
@@ -82,7 +76,7 @@ pub struct Builder {
8276
h1_read_buf_exact_size: Option<usize>,
8377
h1_max_buf_size: Option<usize>,
8478
http2: bool,
85-
h2_builder: h2::client::Builder,
79+
h2_builder: proto::h2::client::Config,
8680
}
8781

8882
/// A future returned by `SendRequest::send_request`.
@@ -420,20 +414,14 @@ impl Builder {
420414
/// Creates a new connection builder.
421415
#[inline]
422416
pub fn new() -> Builder {
423-
let mut h2_builder = h2::client::Builder::default();
424-
h2_builder
425-
.initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW)
426-
.initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW)
427-
.enable_push(false);
428-
429417
Builder {
430418
exec: Exec::Default,
431419
h1_writev: true,
432420
h1_read_buf_exact_size: None,
433421
h1_title_case_headers: false,
434422
h1_max_buf_size: None,
435423
http2: false,
436-
h2_builder,
424+
h2_builder: Default::default(),
437425
}
438426
}
439427

@@ -491,7 +479,8 @@ impl Builder {
491479
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
492480
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
493481
if let Some(sz) = sz.into() {
494-
self.h2_builder.initial_window_size(sz);
482+
self.h2_builder.adaptive_window = false;
483+
self.h2_builder.initial_stream_window_size = sz;
495484
}
496485
self
497486
}
@@ -506,11 +495,24 @@ impl Builder {
506495
sz: impl Into<Option<u32>>,
507496
) -> &mut Self {
508497
if let Some(sz) = sz.into() {
509-
self.h2_builder.initial_connection_window_size(sz);
498+
self.h2_builder.adaptive_window = false;
499+
self.h2_builder.initial_conn_window_size = sz;
510500
}
511501
self
512502
}
513503

504+
/// Sets whether to use an adaptive flow control.
505+
///
506+
/// Enabling this will override the limits set in
507+
/// `http2_initial_stream_window_size` and
508+
/// `http2_initial_connection_window_size`.
509+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
510+
self.h2_builder.adaptive_window = enabled;
511+
self.h2_builder.initial_conn_window_size = 65_535;
512+
self.h2_builder.initial_stream_window_size = 65_535;
513+
self
514+
}
515+
514516
/// Constructs a connection with the configured options and IO.
515517
pub fn handshake<T, B>(
516518
&self,

src/client/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,16 @@ impl Builder {
996996
self
997997
}
998998

999+
/// Sets whether to use an adaptive flow control.
1000+
///
1001+
/// Enabling this will override the limits set in
1002+
/// `http2_initial_stream_window_size` and
1003+
/// `http2_initial_connection_window_size`.
1004+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1005+
self.conn_builder.http2_adaptive_window(enabled);
1006+
self
1007+
}
1008+
9991009
/// Sets the maximum idle connection per host allowed in the pool.
10001010
///
10011011
/// Default is `usize::MAX` (no limit).

src/proto/h2/bdp.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// What should it do?
2+
//
3+
// # BDP Algorithm
4+
//
5+
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
6+
// 1a. Record current time.
7+
// 1b. Send a BDP ping.
8+
// 2. Increment the number of received bytes.
9+
// 3. When the BDP ping ack is received:
10+
// 3a. Record duration from sent time.
11+
// 3b. Merge RTT with a running average.
12+
// 3c. Calculate bdp as bytes/rtt.
13+
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
14+
//
15+
//
16+
// # Implementation
17+
//
18+
// - `hyper::Body::h2` variant includes a "bdp channel"
19+
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())`
20+
//
21+
22+
use std::sync::{Arc, Mutex, Weak};
23+
use std::task::{self, Poll};
24+
use std::time::{Duration, Instant};
25+
26+
use h2::{PingPong, Ping};
27+
28+
type WindowSize = u32;
29+
30+
/// Any higher than this likely will be hitting the TCP flow control.
31+
const BDP_LIMIT: usize = 1024 * 1024 * 16;
32+
33+
pub(crate) fn disabled() -> Sampler {
34+
Sampler { shared: Weak::new() }
35+
}
36+
37+
pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) {
38+
let shared = Arc::new(Mutex::new(Shared {
39+
bytes: 0,
40+
ping_pong,
41+
ping_sent: false,
42+
sent_at: Instant::now(),
43+
}));
44+
45+
(
46+
Sampler { shared: Arc::downgrade(&shared) },
47+
Estimator {
48+
bdp: initial_window,
49+
max_bandwidth: 0.0,
50+
shared,
51+
samples: 0,
52+
rtt: 0.0,
53+
},
54+
)
55+
}
56+
57+
#[derive(Clone)]
58+
pub(crate) struct Sampler {
59+
shared: Weak<Mutex<Shared>>,
60+
}
61+
62+
pub(super) struct Estimator {
63+
shared: Arc<Mutex<Shared>>,
64+
65+
/// Current BDP in bytes
66+
bdp: u32,
67+
/// Largest bandwidth we've seen so far.
68+
max_bandwidth: f64,
69+
/// Count of samples made (ping sent and received)
70+
samples: usize,
71+
/// Round trip time in seconds
72+
rtt: f64,
73+
}
74+
75+
struct Shared {
76+
bytes: usize,
77+
ping_pong: PingPong,
78+
ping_sent: bool,
79+
sent_at: Instant,
80+
}
81+
82+
impl Sampler {
83+
pub(crate) fn sample(&self, bytes: usize) {
84+
let shared = if let Some(shared) = self.shared.upgrade() {
85+
shared
86+
} else {
87+
return;
88+
};
89+
90+
let mut inner = shared.lock().unwrap();
91+
92+
if !inner.ping_sent {
93+
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) {
94+
inner.ping_sent = true;
95+
inner.sent_at = Instant::now();
96+
trace!("sending BDP ping");
97+
} else {
98+
return;
99+
}
100+
}
101+
102+
inner.bytes += bytes;
103+
}
104+
105+
pub(crate) fn is_enabled(&self) -> bool {
106+
self.shared.strong_count() > 0
107+
}
108+
}
109+
110+
impl Estimator {
111+
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> {
112+
let mut inner = self.shared.lock().unwrap();
113+
if !inner.ping_sent {
114+
// XXX: this doesn't register a waker...?
115+
return Poll::Pending;
116+
}
117+
118+
let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) {
119+
Ok(_pong) => {
120+
let rtt = inner.sent_at.elapsed();
121+
let bytes = inner.bytes;
122+
inner.bytes = 0;
123+
inner.ping_sent = false;
124+
self.samples += 1;
125+
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
126+
(bytes, rtt)
127+
},
128+
Err(e) => {
129+
debug!("bdp pong error: {}", e);
130+
return Poll::Pending;
131+
}
132+
};
133+
134+
drop(inner);
135+
136+
if let Some(bdp) = self.calculate(bytes, rtt) {
137+
Poll::Ready(bdp)
138+
} else {
139+
// XXX: this doesn't register a waker...?
140+
Poll::Pending
141+
}
142+
}
143+
144+
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
145+
// No need to do any math if we're at the limit.
146+
if self.bdp as usize == BDP_LIMIT {
147+
return None;
148+
}
149+
150+
// average the rtt
151+
let rtt = seconds(rtt);
152+
if self.samples < 10 {
153+
// Average the first 10 samples
154+
self.rtt += (rtt - self.rtt) / (self.samples as f64);
155+
} else {
156+
self.rtt += (rtt - self.rtt) / 0.9;
157+
}
158+
159+
// calculate the current bandwidth
160+
let bw = (bytes as f64) / (self.rtt * 1.5);
161+
trace!("current bandwidth = {:.1}B/s", bw);
162+
163+
if bw < self.max_bandwidth {
164+
// not a faster bandwidth, so don't update
165+
return None;
166+
} else {
167+
self.max_bandwidth = bw;
168+
}
169+
170+
171+
// if the current `bytes` sample is at least 2/3 the previous
172+
// bdp, increase to double the current sample.
173+
if (bytes as f64) >= (self.bdp as f64) * 0.66 {
174+
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
175+
trace!("BDP increased to {}", self.bdp);
176+
Some(self.bdp)
177+
} else {
178+
None
179+
}
180+
}
181+
}
182+
183+
fn seconds(dur: Duration) -> f64 {
184+
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
185+
let secs = dur.as_secs() as f64;
186+
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
187+
}

0 commit comments

Comments
 (0)