Skip to content

Commit d58aa73

Browse files
authored
feat(server): add upgrade support to lower-level Connection API (#1459)
Closes #1323
1 parent eb15c66 commit d58aa73

File tree

7 files changed

+229
-104
lines changed

7 files changed

+229
-104
lines changed

src/proto/h1/conn.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,8 @@ where I: AsyncRead + AsyncWrite,
500500
Ok(encoder) => {
501501
if !encoder.is_eof() {
502502
Writing::Body(encoder)
503+
} else if encoder.is_last() {
504+
Writing::Closed
503505
} else {
504506
Writing::KeepAlive
505507
}
@@ -566,7 +568,11 @@ where I: AsyncRead + AsyncWrite,
566568
self.io.buffer(encoded);
567569

568570
if encoder.is_eof() {
569-
Writing::KeepAlive
571+
if encoder.is_last() {
572+
Writing::Closed
573+
} else {
574+
Writing::KeepAlive
575+
}
570576
} else {
571577
return Ok(AsyncSink::Ready);
572578
}
@@ -577,7 +583,11 @@ where I: AsyncRead + AsyncWrite,
577583
if let Some(end) = end {
578584
self.io.buffer(end);
579585
}
580-
Writing::KeepAlive
586+
if encoder.is_last() {
587+
Writing::Closed
588+
} else {
589+
Writing::KeepAlive
590+
}
581591
},
582592
Err(_not_eof) => Writing::Closed,
583593
}

src/proto/h1/encode.rs

+17-8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use iovec::IoVec;
99
#[derive(Debug, Clone)]
1010
pub struct Encoder {
1111
kind: Kind,
12+
is_last: bool,
1213
}
1314

1415
#[derive(Debug)]
@@ -43,22 +44,22 @@ enum BufKind<B> {
4344
}
4445

4546
impl Encoder {
46-
pub fn chunked() -> Encoder {
47+
fn new(kind: Kind) -> Encoder {
4748
Encoder {
48-
kind: Kind::Chunked,
49+
kind: kind,
50+
is_last: false,
4951
}
5052
}
53+
pub fn chunked() -> Encoder {
54+
Encoder::new(Kind::Chunked)
55+
}
5156

5257
pub fn length(len: u64) -> Encoder {
53-
Encoder {
54-
kind: Kind::Length(len),
55-
}
58+
Encoder::new(Kind::Length(len))
5659
}
5760

5861
pub fn eof() -> Encoder {
59-
Encoder {
60-
kind: Kind::Eof,
61-
}
62+
Encoder::new(Kind::Eof)
6263
}
6364

6465
pub fn is_eof(&self) -> bool {
@@ -68,6 +69,14 @@ impl Encoder {
6869
}
6970
}
7071

72+
pub fn set_last(&mut self) {
73+
self.is_last = true;
74+
}
75+
76+
pub fn is_last(&self) -> bool {
77+
self.is_last
78+
}
79+
7180
pub fn end<B>(&self) -> Result<Option<EncodedBuf<B>>, NotEof> {
7281
match self.kind {
7382
Kind::Length(0) => Ok(None),

src/proto/h1/role.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ where
132132
// replying with the latter status code response.
133133
let ret = if ::StatusCode::SwitchingProtocols == head.subject {
134134
T::on_encode_upgrade(&mut head)
135-
.map(|_| Server::set_length(&mut head, has_body, method.as_ref()))
135+
.map(|_| {
136+
let mut enc = Server::set_length(&mut head, has_body, method.as_ref());
137+
enc.set_last();
138+
enc
139+
})
136140
} else if head.subject.is_informational() {
137141
error!("response with 1xx status code not supported");
138142
head = MessageHead::default();

src/proto/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ pub fn expecting_continue(version: HttpVersion, headers: &Headers) -> bool {
134134
ret
135135
}
136136

137-
pub type ServerTransaction = h1::role::Server<h1::role::NoUpgrades>;
137+
pub type ServerTransaction = h1::role::Server<h1::role::YesUpgrades>;
138+
//pub type ServerTransaction = h1::role::Server<h1::role::NoUpgrades>;
139+
//pub type ServerUpgradeTransaction = h1::role::Server<h1::role::YesUpgrades>;
138140

139141
pub type ClientTransaction = h1::role::Client<h1::role::NoUpgrades>;
140142
pub type ClientUpgradeTransaction = h1::role::Client<h1::role::YesUpgrades>;

src/server/conn.rs

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//! Lower-level Server connection API.
2+
//!
3+
//! The types in thie module are to provide a lower-level API based around a
4+
//! single connection. Accepting a connection and binding it with a service
5+
//! are not handled at this level. This module provides the building blocks to
6+
//! customize those things externally.
7+
//!
8+
//! If don't have need to manage connections yourself, consider using the
9+
//! higher-level [Server](super) API.
10+
11+
use std::fmt;
12+
13+
use bytes::Bytes;
14+
use futures::{Future, Poll, Stream};
15+
use tokio_io::{AsyncRead, AsyncWrite};
16+
17+
use proto;
18+
use super::{HyperService, Request, Response, Service};
19+
20+
/// A future binding a connection with a Service.
21+
///
22+
/// Polling this future will drive HTTP forward.
23+
#[must_use = "futures do nothing unless polled"]
24+
pub struct Connection<I, S>
25+
where
26+
S: HyperService,
27+
S::ResponseBody: Stream<Error=::Error>,
28+
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
29+
{
30+
pub(super) conn: proto::dispatch::Dispatcher<
31+
proto::dispatch::Server<S>,
32+
S::ResponseBody,
33+
I,
34+
<S::ResponseBody as Stream>::Item,
35+
proto::ServerTransaction,
36+
>,
37+
}
38+
39+
/// Deconstructed parts of a `Connection`.
40+
///
41+
/// This allows taking apart a `Connection` at a later time, in order to
42+
/// reclaim the IO object, and additional related pieces.
43+
#[derive(Debug)]
44+
pub struct Parts<T> {
45+
/// The original IO object used in the handshake.
46+
pub io: T,
47+
/// A buffer of bytes that have been read but not processed as HTTP.
48+
///
49+
/// If the client sent additional bytes after its last request, and
50+
/// this connection "ended" with an upgrade, the read buffer will contain
51+
/// those bytes.
52+
///
53+
/// You will want to check for any existing bytes if you plan to continue
54+
/// communicating on the IO object.
55+
pub read_buf: Bytes,
56+
_inner: (),
57+
}
58+
59+
// ===== impl Connection =====
60+
61+
impl<I, B, S> Connection<I, S>
62+
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
63+
I: AsyncRead + AsyncWrite + 'static,
64+
B: Stream<Error=::Error> + 'static,
65+
B::Item: AsRef<[u8]>,
66+
{
67+
/// Disables keep-alive for this connection.
68+
pub fn disable_keep_alive(&mut self) {
69+
self.conn.disable_keep_alive()
70+
}
71+
72+
/// Return the inner IO object, and additional information.
73+
///
74+
/// This should only be called after `poll_without_shutdown` signals
75+
/// that the connection is "done". Otherwise, it may not have finished
76+
/// flushing all necessary HTTP bytes.
77+
pub fn into_parts(self) -> Parts<I> {
78+
let (io, read_buf) = self.conn.into_inner();
79+
Parts {
80+
io: io,
81+
read_buf: read_buf,
82+
_inner: (),
83+
}
84+
}
85+
86+
/// Poll the connection for completion, but without calling `shutdown`
87+
/// on the underlying IO.
88+
///
89+
/// This is useful to allow running a connection while doing an HTTP
90+
/// upgrade. Once the upgrade is completed, the connection would be "done",
91+
/// but it is not desired to actally shutdown the IO object. Instead you
92+
/// would take it back using `into_parts`.
93+
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
94+
try_ready!(self.conn.poll_without_shutdown());
95+
Ok(().into())
96+
}
97+
}
98+
99+
impl<I, B, S> Future for Connection<I, S>
100+
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
101+
I: AsyncRead + AsyncWrite + 'static,
102+
B: Stream<Error=::Error> + 'static,
103+
B::Item: AsRef<[u8]>,
104+
{
105+
type Item = ();
106+
type Error = ::Error;
107+
108+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
109+
self.conn.poll()
110+
}
111+
}
112+
113+
impl<I, S> fmt::Debug for Connection<I, S>
114+
where
115+
S: HyperService,
116+
S::ResponseBody: Stream<Error=::Error>,
117+
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
118+
{
119+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120+
f.debug_struct("Connection")
121+
.finish()
122+
}
123+
}
124+

src/server/mod.rs

+2-92
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
#[cfg(feature = "compat")]
77
pub mod compat;
8+
pub mod conn;
89
mod service;
910

1011
use std::cell::RefCell;
@@ -46,6 +47,7 @@ feat_server_proto! {
4647
};
4748
}
4849

50+
pub use self::conn::Connection;
4951
pub use self::service::{const_service, service_fn};
5052

5153
/// A configuration of the HTTP protocol.
@@ -108,34 +110,6 @@ pub struct AddrIncoming {
108110
timeout: Option<Timeout>,
109111
}
110112

111-
/// A future binding a connection with a Service.
112-
///
113-
/// Polling this future will drive HTTP forward.
114-
///
115-
/// # Note
116-
///
117-
/// This will currently yield an unnameable (`Opaque`) value
118-
/// on success. The purpose of this is that nothing can be assumed about
119-
/// the type, not even it's name. It's probable that in a later release,
120-
/// this future yields the underlying IO object, which could be done without
121-
/// a breaking change.
122-
///
123-
/// It is likely best to just map the value to `()`, for now.
124-
#[must_use = "futures do nothing unless polled"]
125-
pub struct Connection<I, S>
126-
where
127-
S: HyperService,
128-
S::ResponseBody: Stream<Error=::Error>,
129-
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
130-
{
131-
conn: proto::dispatch::Dispatcher<
132-
proto::dispatch::Server<S>,
133-
S::ResponseBody,
134-
I,
135-
<S::ResponseBody as Stream>::Item,
136-
proto::ServerTransaction,
137-
>,
138-
}
139113

140114
// ===== impl Http =====
141115

@@ -567,70 +541,6 @@ where
567541
}
568542
*/
569543

570-
// ===== impl Connection =====
571-
572-
impl<I, B, S> Future for Connection<I, S>
573-
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
574-
I: AsyncRead + AsyncWrite + 'static,
575-
B: Stream<Error=::Error> + 'static,
576-
B::Item: AsRef<[u8]>,
577-
{
578-
type Item = self::unnameable::Opaque;
579-
type Error = ::Error;
580-
581-
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
582-
try_ready!(self.conn.poll());
583-
Ok(self::unnameable::opaque().into())
584-
}
585-
}
586-
587-
impl<I, S> fmt::Debug for Connection<I, S>
588-
where
589-
S: HyperService,
590-
S::ResponseBody: Stream<Error=::Error>,
591-
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
592-
{
593-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
594-
f.debug_struct("Connection")
595-
.finish()
596-
}
597-
}
598-
599-
impl<I, B, S> Connection<I, S>
600-
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
601-
I: AsyncRead + AsyncWrite + 'static,
602-
B: Stream<Error=::Error> + 'static,
603-
B::Item: AsRef<[u8]>,
604-
{
605-
/// Disables keep-alive for this connection.
606-
pub fn disable_keep_alive(&mut self) {
607-
self.conn.disable_keep_alive()
608-
}
609-
}
610-
611-
mod unnameable {
612-
// This type is specifically not exported outside the crate,
613-
// so no one can actually name the type. With no methods, we make no
614-
// promises about this type.
615-
//
616-
// All of that to say we can eventually replace the type returned
617-
// to something else, and it would not be a breaking change.
618-
//
619-
// We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which
620-
// doesn't have a `Debug` bound. So, this type can't implement `Debug`
621-
// either, so the type change doesn't break people.
622-
#[allow(missing_debug_implementations)]
623-
pub struct Opaque {
624-
_inner: (),
625-
}
626-
627-
pub fn opaque() -> Opaque {
628-
Opaque {
629-
_inner: (),
630-
}
631-
}
632-
}
633-
634544
// ===== impl AddrIncoming =====
635545

636546
impl AddrIncoming {

0 commit comments

Comments
 (0)