Skip to content

Commit 95155c4

Browse files
committed
feat(client): support local bind for HttpConnector
Add `set_local_address` to the `HttpConnector`. This configures the client to bind the socket to a local address of the host before it connects to the destination. This is useful on hosts which have multiple network interfaces, to ensure the request is issued over a specific interface. Closes #1498
1 parent d19d95a commit 95155c4

File tree

3 files changed

+112
-30
lines changed

3 files changed

+112
-30
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ runtime = [
5757
"tokio-tcp",
5858
]
5959

60+
[[example]]
61+
name = "bound_client"
62+
path = "examples/bound_client.rs"
63+
required-features = ["runtime"]
64+
6065
[[example]]
6166
name = "client"
6267
path = "examples/client.rs"

examples/bound_client.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#![deny(warnings)]
2+
extern crate hyper;
3+
extern crate pretty_env_logger;
4+
5+
use std::env;
6+
use std::net::IpAddr;
7+
use std::io::{self, Write};
8+
9+
use hyper::{Body, Client, Request};
10+
use hyper::rt::{self, Future, Stream};
11+
use hyper::client::connect::HttpConnector;
12+
13+
fn main() {
14+
pretty_env_logger::init();
15+
16+
let url = match env::args().nth(1) {
17+
Some(url) => url,
18+
None => {
19+
println!("Usage: client <url> [<bind_addr>]");
20+
return;
21+
}
22+
};
23+
24+
let url = url.parse::<hyper::Uri>().unwrap();
25+
if url.scheme_part().map(|s| s.as_ref()) != Some("http") {
26+
println!("This example only works with 'http' URLs.");
27+
return;
28+
}
29+
30+
let bind_addr = env::args().nth(2);
31+
32+
let bind_addr: Option<IpAddr> = bind_addr.map(|s| s.parse::<IpAddr>().unwrap());
33+
34+
rt::run(rt::lazy(move || {
35+
let mut connector = HttpConnector::new(4);
36+
connector.set_local_address(bind_addr);
37+
let client = Client::builder().build(connector);
38+
39+
let mut req = Request::new(Body::empty());
40+
*req.uri_mut() = url;
41+
42+
client.request(req).and_then(|res| {
43+
println!("Response: {}", res.status());
44+
println!("Headers: {:#?}", res.headers());
45+
46+
res.into_body().for_each(|chunk| {
47+
io::stdout().write_all(&chunk)
48+
.map_err(|e| panic!("example expects stdout is open, error={}", e))
49+
})
50+
}).map(|_| {
51+
println!("\n\nDone.");
52+
}).map_err(|err| {
53+
eprintln!("Error {}", err);
54+
})
55+
}));
56+
}

src/client/connect.rs

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//! establishes connections over TCP.
77
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
88
use std::error::Error as StdError;
9+
use std::borrow::Cow;
910

1011
use futures::Future;
1112
use http::Uri;
@@ -128,7 +129,7 @@ mod http {
128129
use std::fmt;
129130
use std::io;
130131
use std::mem;
131-
use std::net::SocketAddr;
132+
use std::net::{IpAddr, SocketAddr};
132133
use std::sync::Arc;
133134
use std::time::Duration;
134135

@@ -146,30 +147,35 @@ mod http {
146147
use self::http_connector::HttpConnectorBlockingTask;
147148

148149

149-
fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
150-
if let Some(ref handle) = *handle {
151-
let builder = match addr {
152-
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
153-
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
150+
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
151+
let builder = match addr {
152+
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
153+
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
154+
};
155+
156+
if let Some(ref local_addr) = *local_addr {
157+
// Caller has requested this socket be bound before calling connect
158+
builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
159+
}
160+
else if cfg!(windows) {
161+
// Windows requires a socket be bound before calling connect
162+
let any: SocketAddr = match addr {
163+
&SocketAddr::V4(_) => {
164+
([0, 0, 0, 0], 0).into()
165+
},
166+
&SocketAddr::V6(_) => {
167+
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
168+
}
154169
};
170+
builder.bind(any)?;
171+
}
155172

156-
if cfg!(windows) {
157-
// Windows requires a socket be bound before calling connect
158-
let any: SocketAddr = match addr {
159-
&SocketAddr::V4(_) => {
160-
([0, 0, 0, 0], 0).into()
161-
},
162-
&SocketAddr::V6(_) => {
163-
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
164-
}
165-
};
166-
builder.bind(any)?;
167-
}
173+
let handle = match *handle {
174+
Some(ref handle) => Cow::Borrowed(handle),
175+
None => Cow::Owned(Handle::current()),
176+
};
168177

169-
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
170-
} else {
171-
Ok(TcpStream::connect(addr))
172-
}
178+
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle))
173179
}
174180

175181
/// A connector for the `http` scheme.
@@ -182,6 +188,7 @@ mod http {
182188
handle: Option<Handle>,
183189
keep_alive_timeout: Option<Duration>,
184190
nodelay: bool,
191+
local_address: Option<IpAddr>,
185192
}
186193

187194
impl HttpConnector {
@@ -218,6 +225,7 @@ mod http {
218225
handle,
219226
keep_alive_timeout: None,
220227
nodelay: false,
228+
local_address: None,
221229
}
222230
}
223231

@@ -246,6 +254,16 @@ mod http {
246254
pub fn set_nodelay(&mut self, nodelay: bool) {
247255
self.nodelay = nodelay;
248256
}
257+
258+
/// Set that all sockets are bound to the configured address before connection.
259+
///
260+
/// If `None`, the sockets will not be bound.
261+
///
262+
/// Default is `None`.
263+
#[inline]
264+
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
265+
self.local_address = addr;
266+
}
249267
}
250268

251269
impl fmt::Debug for HttpConnector {
@@ -287,7 +305,7 @@ mod http {
287305
};
288306

289307
HttpConnecting {
290-
state: State::Lazy(self.executor.clone(), host.into(), port),
308+
state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address),
291309
handle: self.handle.clone(),
292310
keep_alive_timeout: self.keep_alive_timeout,
293311
nodelay: self.nodelay,
@@ -337,8 +355,8 @@ mod http {
337355
}
338356

339357
enum State {
340-
Lazy(HttpConnectExecutor, String, u16),
341-
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
358+
Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>),
359+
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>),
342360
Connecting(ConnectingTcp),
343361
Error(Option<io::Error>),
344362
}
@@ -351,26 +369,28 @@ mod http {
351369
loop {
352370
let state;
353371
match self.state {
354-
State::Lazy(ref executor, ref mut host, port) => {
372+
State::Lazy(ref executor, ref mut host, port, local_addr) => {
355373
// If the host is already an IP addr (v4 or v6),
356374
// skip resolving the dns and start connecting right away.
357375
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
358376
state = State::Connecting(ConnectingTcp {
359377
addrs: addrs,
378+
local_addr: local_addr,
360379
current: None
361380
})
362381
} else {
363382
let host = mem::replace(host, String::new());
364383
let work = dns::Work::new(host, port);
365-
state = State::Resolving(oneshot::spawn(work, executor));
384+
state = State::Resolving(oneshot::spawn(work, executor), local_addr);
366385
}
367386
},
368-
State::Resolving(ref mut future) => {
387+
State::Resolving(ref mut future, local_addr) => {
369388
match try!(future.poll()) {
370389
Async::NotReady => return Ok(Async::NotReady),
371390
Async::Ready(addrs) => {
372391
state = State::Connecting(ConnectingTcp {
373392
addrs: addrs,
393+
local_addr: local_addr,
374394
current: None,
375395
})
376396
}
@@ -402,6 +422,7 @@ mod http {
402422

403423
struct ConnectingTcp {
404424
addrs: dns::IpAddrs,
425+
local_addr: Option<IpAddr>,
405426
current: Option<ConnectFuture>,
406427
}
407428

@@ -418,14 +439,14 @@ mod http {
418439
err = Some(e);
419440
if let Some(addr) = self.addrs.next() {
420441
debug!("connecting to {}", addr);
421-
*current = connect(&addr, handle)?;
442+
*current = connect(&addr, &self.local_addr, handle)?;
422443
continue;
423444
}
424445
}
425446
}
426447
} else if let Some(addr) = self.addrs.next() {
427448
debug!("connecting to {}", addr);
428-
self.current = Some(connect(&addr, handle)?);
449+
self.current = Some(connect(&addr, &self.local_addr, handle)?);
429450
continue;
430451
}
431452

0 commit comments

Comments
 (0)