Skip to content

Commit 6c76441

Browse files
committed
feat(client): add proxy::Tunnel legacy util
1 parent ab2c9cd commit 6c76441

File tree

6 files changed

+329
-0
lines changed

6 files changed

+329
-0
lines changed

src/client/legacy/connect/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ pub mod dns;
8080
#[cfg(feature = "tokio")]
8181
mod http;
8282

83+
pub mod proxy;
84+
8385
pub(crate) mod capture;
8486
pub use capture::{capture_connection, CaptureConnection};
8587

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//! Proxy helpers
2+
3+
mod tunnel;
4+
5+
pub use self::tunnel::Tunnel;
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
use std::error::Error as StdError;
2+
use std::future::Future;
3+
use std::marker::{PhantomData, Unpin};
4+
use std::pin::Pin;
5+
use std::task::{self, Poll};
6+
7+
use http::{HeaderMap, HeaderValue, Uri};
8+
use hyper::rt::{Read, Write};
9+
use pin_project_lite::pin_project;
10+
use tower_service::Service;
11+
12+
/// Tunnel Proxy via HTTP CONNECT
13+
#[derive(Debug)]
14+
pub struct Tunnel<C> {
15+
headers: Headers,
16+
inner: C,
17+
proxy_dst: Uri,
18+
}
19+
20+
#[derive(Clone, Debug)]
21+
enum Headers {
22+
Empty,
23+
Auth(HeaderValue),
24+
Extra(HeaderMap),
25+
}
26+
27+
#[derive(Debug)]
28+
pub enum TunnelError {
29+
ConnectFailed(Box<dyn StdError + Send + Sync>),
30+
Io(std::io::Error),
31+
MissingHost,
32+
ProxyAuthRequired,
33+
ProxyHeadersTooLong,
34+
TunnelUnexpectedEof,
35+
TunnelUnsuccessful,
36+
}
37+
38+
pin_project! {
39+
// Not publicly exported (so missing_docs doesn't trigger).
40+
//
41+
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
42+
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
43+
// (and thus we can change the type in the future).
44+
#[must_use = "futures do nothing unless polled"]
45+
#[allow(missing_debug_implementations)]
46+
pub struct Tunneling<F, T> {
47+
#[pin]
48+
fut: BoxTunneling<T>,
49+
_marker: PhantomData<F>,
50+
}
51+
}
52+
53+
type BoxTunneling<T> = Pin<Box<dyn Future<Output = Result<T, TunnelError>> + Send>>;
54+
55+
impl<C> Tunnel<C> {
56+
/// Create a new Tunnel service.
57+
pub fn new(proxy_dst: Uri, connector: C) -> Self {
58+
Self {
59+
headers: Headers::Empty,
60+
inner: connector,
61+
proxy_dst,
62+
}
63+
}
64+
65+
/// Add `proxy-authorization` header value to the CONNECT request.
66+
pub fn with_auth(mut self, mut auth: HeaderValue) -> Self {
67+
// just in case the user forgot
68+
auth.set_sensitive(true);
69+
match self.headers {
70+
Headers::Empty => {
71+
self.headers = Headers::Auth(auth);
72+
}
73+
Headers::Auth(ref mut existing) => {
74+
*existing = auth;
75+
}
76+
Headers::Extra(ref mut extra) => {
77+
extra.insert(http::header::PROXY_AUTHORIZATION, auth);
78+
}
79+
}
80+
81+
self
82+
}
83+
84+
/// Add extra headers to be sent with the CONNECT request.
85+
///
86+
/// If existing headers have been set, these will be merged.
87+
pub fn with_headers(mut self, mut headers: HeaderMap) -> Self {
88+
match self.headers {
89+
Headers::Empty => {
90+
self.headers = Headers::Extra(headers);
91+
}
92+
Headers::Auth(auth) => {
93+
headers
94+
.entry(http::header::PROXY_AUTHORIZATION)
95+
.or_insert(auth);
96+
self.headers = Headers::Extra(headers);
97+
}
98+
Headers::Extra(ref mut extra) => {
99+
extra.extend(headers);
100+
}
101+
}
102+
103+
self
104+
}
105+
}
106+
107+
impl<C> Service<Uri> for Tunnel<C>
108+
where
109+
C: Service<Uri>,
110+
C::Future: Send + 'static,
111+
C::Response: Read + Write + Unpin + Send + 'static,
112+
C::Error: Into<Box<dyn StdError + Send + Sync>>,
113+
{
114+
type Response = C::Response;
115+
type Error = TunnelError;
116+
type Future = Tunneling<C::Future, C::Response>;
117+
118+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
119+
futures_util::ready!(self.inner.poll_ready(cx))
120+
.map_err(|e| TunnelError::ConnectFailed(e.into()))?;
121+
Poll::Ready(Ok(()))
122+
}
123+
124+
fn call(&mut self, dst: Uri) -> Self::Future {
125+
let connecting = self.inner.call(self.proxy_dst.clone());
126+
let headers = self.headers.clone();
127+
128+
Tunneling {
129+
fut: Box::pin(async move {
130+
let conn = connecting
131+
.await
132+
.map_err(|e| TunnelError::ConnectFailed(e.into()))?;
133+
tunnel(
134+
conn,
135+
dst.host().ok_or(TunnelError::MissingHost)?,
136+
dst.port().map(|p| p.as_u16()).unwrap_or(443),
137+
&headers,
138+
)
139+
.await
140+
}),
141+
_marker: PhantomData,
142+
}
143+
}
144+
}
145+
146+
impl<F, T, E> Future for Tunneling<F, T>
147+
where
148+
F: Future<Output = Result<T, E>>,
149+
{
150+
type Output = Result<T, TunnelError>;
151+
152+
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
153+
self.project().fut.poll(cx)
154+
}
155+
}
156+
157+
async fn tunnel<T>(mut conn: T, host: &str, port: u16, headers: &Headers) -> Result<T, TunnelError>
158+
where
159+
T: Read + Write + Unpin,
160+
{
161+
let mut buf = format!(
162+
"\
163+
CONNECT {host}:{port} HTTP/1.1\r\n\
164+
Host: {host}:{port}\r\n\
165+
"
166+
)
167+
.into_bytes();
168+
169+
match headers {
170+
Headers::Auth(auth) => {
171+
buf.extend_from_slice(b"Proxy-Authorization: ");
172+
buf.extend_from_slice(auth.as_bytes());
173+
buf.extend_from_slice(b"\r\n");
174+
}
175+
Headers::Extra(extra) => {
176+
for (name, value) in extra {
177+
buf.extend_from_slice(name.as_str().as_bytes());
178+
buf.extend_from_slice(b": ");
179+
buf.extend_from_slice(value.as_bytes());
180+
buf.extend_from_slice(b"\r\n");
181+
}
182+
}
183+
Headers::Empty => (),
184+
}
185+
186+
// headers end
187+
buf.extend_from_slice(b"\r\n");
188+
189+
crate::rt::write_all(&mut conn, &buf)
190+
.await
191+
.map_err(TunnelError::Io)?;
192+
193+
let mut buf = [0; 8192];
194+
let mut pos = 0;
195+
196+
loop {
197+
let n = crate::rt::read(&mut conn, &mut buf[pos..])
198+
.await
199+
.map_err(TunnelError::Io)?;
200+
201+
if n == 0 {
202+
return Err(TunnelError::TunnelUnexpectedEof);
203+
}
204+
pos += n;
205+
206+
let recvd = &buf[..pos];
207+
if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") {
208+
if recvd.ends_with(b"\r\n\r\n") {
209+
return Ok(conn);
210+
}
211+
if pos == buf.len() {
212+
return Err(TunnelError::ProxyHeadersTooLong);
213+
}
214+
// else read more
215+
} else if recvd.starts_with(b"HTTP/1.1 407") {
216+
return Err(TunnelError::ProxyAuthRequired);
217+
} else {
218+
return Err(TunnelError::TunnelUnsuccessful);
219+
}
220+
}
221+
}
222+
223+
impl std::fmt::Display for TunnelError {
224+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225+
f.write_str("tunnel error: ")?;
226+
227+
f.write_str(match self {
228+
TunnelError::MissingHost => "missing destination host",
229+
TunnelError::ProxyAuthRequired => "proxy authorization required",
230+
TunnelError::ProxyHeadersTooLong => "proxy response headers too long",
231+
TunnelError::TunnelUnexpectedEof => "unexpected end of file",
232+
TunnelError::TunnelUnsuccessful => "unsuccessful",
233+
TunnelError::ConnectFailed(_) => "failed to create underlying connection",
234+
TunnelError::Io(_) => "io error establishing tunnel",
235+
})
236+
}
237+
}
238+
239+
impl std::error::Error for TunnelError {
240+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
241+
match self {
242+
TunnelError::Io(ref e) => Some(e),
243+
TunnelError::ConnectFailed(ref e) => Some(&**e),
244+
_ => None,
245+
}
246+
}
247+
}

src/rt/io.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::marker::Unpin;
2+
use std::pin::Pin;
3+
use std::task::Poll;
4+
5+
use futures_util::future;
6+
use futures_util::ready;
7+
use hyper::rt::{Read, ReadBuf, Write};
8+
9+
pub(crate) async fn read<T>(io: &mut T, buf: &mut [u8]) -> Result<usize, std::io::Error>
10+
where
11+
T: Read + Unpin,
12+
{
13+
future::poll_fn(move |cx| {
14+
let mut buf = ReadBuf::new(buf);
15+
ready!(Pin::new(&mut *io).poll_read(cx, buf.unfilled()))?;
16+
Poll::Ready(Ok(buf.filled().len()))
17+
})
18+
.await
19+
}
20+
21+
pub(crate) async fn write_all<T>(io: &mut T, buf: &[u8]) -> Result<(), std::io::Error>
22+
where
23+
T: Write + Unpin,
24+
{
25+
let mut n = 0;
26+
future::poll_fn(move |cx| {
27+
while n < buf.len() {
28+
n += ready!(Pin::new(&mut *io).poll_write(cx, &buf[n..])?);
29+
}
30+
Poll::Ready(Ok(()))
31+
})
32+
.await
33+
}

src/rt/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
//! Runtime utilities
22
3+
#[cfg(feature = "client-legacy")]
4+
mod io;
5+
#[cfg(feature = "client-legacy")]
6+
pub(crate) use self::io::{read, write_all};
7+
38
#[cfg(feature = "tokio")]
49
pub mod tokio;
510

tests/proxy.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2+
use tokio::net::TcpListener;
3+
use tower_service::Service;
4+
5+
use hyper_util::client::legacy::connect::{proxy::Tunnel, HttpConnector};
6+
7+
#[cfg(not(miri))]
8+
#[tokio::test]
9+
async fn test_tunnel_works() {
10+
let tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
11+
let addr = tcp.local_addr().expect("local_addr");
12+
13+
let proxy_dst = format!("http://{}", addr).parse().expect("uri");
14+
let mut connector = Tunnel::new(proxy_dst, HttpConnector::new());
15+
let t1 = tokio::spawn(async move {
16+
let _conn = connector
17+
.call("https://hyper.rs".parse().unwrap())
18+
.await
19+
.expect("tunnel");
20+
});
21+
22+
let t2 = tokio::spawn(async move {
23+
let (mut io, _) = tcp.accept().await.expect("accept");
24+
let mut buf = [0u8; 64];
25+
let n = io.read(&mut buf).await.expect("read 1");
26+
assert_eq!(
27+
&buf[..n],
28+
b"CONNECT hyper.rs:443 HTTP/1.1\r\nHost: hyper.rs:443\r\n\r\n"
29+
);
30+
io.write_all(b"HTTP/1.1 200 OK\r\n\r\n")
31+
.await
32+
.expect("write 1");
33+
});
34+
35+
t1.await.expect("task 1");
36+
t2.await.expect("task 2");
37+
}

0 commit comments

Comments
 (0)