Skip to content

Commit 48102d6

Browse files
authored
feat(http2): add adaptive window size support using BDP (#2138)
This adds support for calculating the Bandwidth-delay product when using HTTP2. When a DATA frame is received, a PING is sent to the remote. While the PING acknoledgement is outstanding, the amount of bytes of all received DATA frames is accumulated. Once we receive the PING acknowledgement, we calculate the BDP based on the number of received bytes and the round-trip-time of the PING. If we are near the current maximum window size, the size is doubled. It's disabled by default until tested more extensively.
1 parent 22dc6fe commit 48102d6

File tree

9 files changed

+451
-71
lines changed

9 files changed

+451
-71
lines changed

src/body/body.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use http::HeaderMap;
1212
use http_body::{Body as HttpBody, SizeHint};
1313

1414
use crate::common::{task, watch, Future, Never, Pin, Poll};
15+
use crate::proto::h2::bdp;
1516
use crate::proto::DecodedLength;
1617
use crate::upgrade::OnUpgrade;
1718

@@ -61,6 +62,11 @@ struct Extra {
6162
/// connection yet.
6263
delayed_eof: Option<DelayEof>,
6364
on_upgrade: OnUpgrade,
65+
66+
/// Records bytes read to compute the BDP.
67+
///
68+
/// Only used with `H2` variant.
69+
h2_bdp: bdp::Sampler,
6470
}
6571

6672
type DelayEofUntil = oneshot::Receiver<Never>;
@@ -175,11 +181,21 @@ impl Body {
175181
Body { kind, extra: None }
176182
}
177183

178-
pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self {
179-
Body::new(Kind::H2 {
184+
pub(crate) fn h2(
185+
recv: h2::RecvStream,
186+
content_length: DecodedLength,
187+
bdp: bdp::Sampler,
188+
) -> Self {
189+
let mut body = Body::new(Kind::H2 {
180190
content_length,
181191
recv,
182-
})
192+
});
193+
194+
if bdp.is_enabled() {
195+
body.extra_mut().h2_bdp = bdp;
196+
}
197+
198+
body
183199
}
184200

185201
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
@@ -204,6 +220,7 @@ impl Body {
204220
Box::new(Extra {
205221
delayed_eof: None,
206222
on_upgrade: OnUpgrade::none(),
223+
h2_bdp: bdp::disabled(),
207224
})
208225
})
209226
}
@@ -262,6 +279,9 @@ impl Body {
262279
Some(Ok(bytes)) => {
263280
let _ = h2.flow_control().release_capacity(bytes.len());
264281
len.sub_if(bytes.len() as u64);
282+
if let Some(ref extra) = self.extra {
283+
extra.h2_bdp.sample(bytes.len());
284+
}
265285
Poll::Ready(Some(Ok(bytes)))
266286
}
267287
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),

src/client/conn.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@ where
3535
H2(#[pin] proto::h2::ClientTask<B>),
3636
}
3737

38-
// Our defaults are chosen for the "majority" case, which usually are not
39-
// resource constrained, and so the spec default of 64kb can be too limiting
40-
// for performance.
41-
const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
42-
const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
43-
4438
/// Returns a handshake future over some IO.
4539
///
4640
/// This is a shortcut for `Builder::new().handshake(io)`.
@@ -82,7 +76,7 @@ pub struct Builder {
8276
h1_read_buf_exact_size: Option<usize>,
8377
h1_max_buf_size: Option<usize>,
8478
http2: bool,
85-
h2_builder: h2::client::Builder,
79+
h2_builder: proto::h2::client::Config,
8680
}
8781

8882
/// A future returned by `SendRequest::send_request`.
@@ -420,20 +414,14 @@ impl Builder {
420414
/// Creates a new connection builder.
421415
#[inline]
422416
pub fn new() -> Builder {
423-
let mut h2_builder = h2::client::Builder::default();
424-
h2_builder
425-
.initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW)
426-
.initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW)
427-
.enable_push(false);
428-
429417
Builder {
430418
exec: Exec::Default,
431419
h1_writev: true,
432420
h1_read_buf_exact_size: None,
433421
h1_title_case_headers: false,
434422
h1_max_buf_size: None,
435423
http2: false,
436-
h2_builder,
424+
h2_builder: Default::default(),
437425
}
438426
}
439427

@@ -491,7 +479,8 @@ impl Builder {
491479
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
492480
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
493481
if let Some(sz) = sz.into() {
494-
self.h2_builder.initial_window_size(sz);
482+
self.h2_builder.adaptive_window = false;
483+
self.h2_builder.initial_stream_window_size = sz;
495484
}
496485
self
497486
}
@@ -506,7 +495,24 @@ impl Builder {
506495
sz: impl Into<Option<u32>>,
507496
) -> &mut Self {
508497
if let Some(sz) = sz.into() {
509-
self.h2_builder.initial_connection_window_size(sz);
498+
self.h2_builder.adaptive_window = false;
499+
self.h2_builder.initial_conn_window_size = sz;
500+
}
501+
self
502+
}
503+
504+
/// Sets whether to use an adaptive flow control.
505+
///
506+
/// Enabling this will override the limits set in
507+
/// `http2_initial_stream_window_size` and
508+
/// `http2_initial_connection_window_size`.
509+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
510+
use proto::h2::SPEC_WINDOW_SIZE;
511+
512+
self.h2_builder.adaptive_window = enabled;
513+
if enabled {
514+
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
515+
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
510516
}
511517
self
512518
}

src/client/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,16 @@ impl Builder {
996996
self
997997
}
998998

999+
/// Sets whether to use an adaptive flow control.
1000+
///
1001+
/// Enabling this will override the limits set in
1002+
/// `http2_initial_stream_window_size` and
1003+
/// `http2_initial_connection_window_size`.
1004+
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1005+
self.conn_builder.http2_adaptive_window(enabled);
1006+
self
1007+
}
1008+
9991009
/// Sets the maximum idle connection per host allowed in the pool.
10001010
///
10011011
/// Default is `usize::MAX` (no limit).

src/proto/h2/bdp.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// What should it do?
2+
//
3+
// # BDP Algorithm
4+
//
5+
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
6+
// 1a. Record current time.
7+
// 1b. Send a BDP ping.
8+
// 2. Increment the number of received bytes.
9+
// 3. When the BDP ping ack is received:
10+
// 3a. Record duration from sent time.
11+
// 3b. Merge RTT with a running average.
12+
// 3c. Calculate bdp as bytes/rtt.
13+
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
14+
//
15+
//
16+
// # Implementation
17+
//
18+
// - `hyper::Body::h2` variant includes a "bdp channel"
19+
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())`
20+
//
21+
22+
use std::sync::{Arc, Mutex, Weak};
23+
use std::task::{self, Poll};
24+
use std::time::{Duration, Instant};
25+
26+
use h2::{Ping, PingPong};
27+
28+
type WindowSize = u32;
29+
30+
/// Any higher than this likely will be hitting the TCP flow control.
31+
const BDP_LIMIT: usize = 1024 * 1024 * 16;
32+
33+
pub(crate) fn disabled() -> Sampler {
34+
Sampler {
35+
shared: Weak::new(),
36+
}
37+
}
38+
39+
pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) {
40+
let shared = Arc::new(Mutex::new(Shared {
41+
bytes: 0,
42+
ping_pong,
43+
ping_sent: false,
44+
sent_at: Instant::now(),
45+
}));
46+
47+
(
48+
Sampler {
49+
shared: Arc::downgrade(&shared),
50+
},
51+
Estimator {
52+
bdp: initial_window,
53+
max_bandwidth: 0.0,
54+
shared,
55+
samples: 0,
56+
rtt: 0.0,
57+
},
58+
)
59+
}
60+
61+
#[derive(Clone)]
62+
pub(crate) struct Sampler {
63+
shared: Weak<Mutex<Shared>>,
64+
}
65+
66+
pub(super) struct Estimator {
67+
shared: Arc<Mutex<Shared>>,
68+
69+
/// Current BDP in bytes
70+
bdp: u32,
71+
/// Largest bandwidth we've seen so far.
72+
max_bandwidth: f64,
73+
/// Count of samples made (ping sent and received)
74+
samples: usize,
75+
/// Round trip time in seconds
76+
rtt: f64,
77+
}
78+
79+
struct Shared {
80+
bytes: usize,
81+
ping_pong: PingPong,
82+
ping_sent: bool,
83+
sent_at: Instant,
84+
}
85+
86+
impl Sampler {
87+
pub(crate) fn sample(&self, bytes: usize) {
88+
let shared = if let Some(shared) = self.shared.upgrade() {
89+
shared
90+
} else {
91+
return;
92+
};
93+
94+
let mut inner = shared.lock().unwrap();
95+
96+
if !inner.ping_sent {
97+
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) {
98+
inner.ping_sent = true;
99+
inner.sent_at = Instant::now();
100+
trace!("sending BDP ping");
101+
} else {
102+
return;
103+
}
104+
}
105+
106+
inner.bytes += bytes;
107+
}
108+
109+
pub(crate) fn is_enabled(&self) -> bool {
110+
self.shared.strong_count() > 0
111+
}
112+
}
113+
114+
impl Estimator {
115+
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> {
116+
let mut inner = self.shared.lock().unwrap();
117+
if !inner.ping_sent {
118+
// XXX: this doesn't register a waker...?
119+
return Poll::Pending;
120+
}
121+
122+
let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) {
123+
Ok(_pong) => {
124+
let rtt = inner.sent_at.elapsed();
125+
let bytes = inner.bytes;
126+
inner.bytes = 0;
127+
inner.ping_sent = false;
128+
self.samples += 1;
129+
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
130+
(bytes, rtt)
131+
}
132+
Err(e) => {
133+
debug!("bdp pong error: {}", e);
134+
return Poll::Pending;
135+
}
136+
};
137+
138+
drop(inner);
139+
140+
if let Some(bdp) = self.calculate(bytes, rtt) {
141+
Poll::Ready(bdp)
142+
} else {
143+
// XXX: this doesn't register a waker...?
144+
Poll::Pending
145+
}
146+
}
147+
148+
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
149+
// No need to do any math if we're at the limit.
150+
if self.bdp as usize == BDP_LIMIT {
151+
return None;
152+
}
153+
154+
// average the rtt
155+
let rtt = seconds(rtt);
156+
if self.samples < 10 {
157+
// Average the first 10 samples
158+
self.rtt += (rtt - self.rtt) / (self.samples as f64);
159+
} else {
160+
self.rtt += (rtt - self.rtt) / 0.9;
161+
}
162+
163+
// calculate the current bandwidth
164+
let bw = (bytes as f64) / (self.rtt * 1.5);
165+
trace!("current bandwidth = {:.1}B/s", bw);
166+
167+
if bw < self.max_bandwidth {
168+
// not a faster bandwidth, so don't update
169+
return None;
170+
} else {
171+
self.max_bandwidth = bw;
172+
}
173+
174+
// if the current `bytes` sample is at least 2/3 the previous
175+
// bdp, increase to double the current sample.
176+
if (bytes as f64) >= (self.bdp as f64) * 0.66 {
177+
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
178+
trace!("BDP increased to {}", self.bdp);
179+
Some(self.bdp)
180+
} else {
181+
None
182+
}
183+
}
184+
}
185+
186+
fn seconds(dur: Duration) -> f64 {
187+
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
188+
let secs = dur.as_secs() as f64;
189+
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
190+
}

0 commit comments

Comments
 (0)