Skip to content

Commit ced949c

Browse files
committed
feat(server): allow !Send Servers
Until this commit, servers have required that `Service` and their `Future` to be `Send`, since the server needs to spawn some internal tasks to an executor, and by default, that is `tokio::spawn`, which could be spawning to a threadpool. This was true even if the user were certain there was no threadpool involved, and was instead using a different single-threaded runtime, like `tokio::runtime::current_thread`. This changes makes all the server pieces generic over an `E`, which is essentially `Executor<PrivateTypes<Server::Future>>`. There's a new set of internal traits, `H2Exec` and `NewSvcExec`, which allow for the type signature to only show the generics that the user is providing. The traits cannot be implemented explicitly, but there are blanket implementations for `E: Executor<SpecificType>`. If the user provides their own executor, it simply needs to have a generic `impl<F> Executor<F> for MyExec`. That impl can have bounds deciding whether to require `F: Send`. If the executor does require `Send`, and the `Service` futures are `!Send`, there will be compiler errors. To prevent a breaking change, all the types that gained the `E` generic have a default type set, which is the original `tokio::spawn` executor.
1 parent 00c96de commit ced949c

File tree

11 files changed

+426
-133
lines changed

11 files changed

+426
-133
lines changed

Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ name = "send_file"
114114
path = "examples/send_file.rs"
115115
required-features = ["runtime"]
116116

117+
[[example]]
118+
name = "single_threaded"
119+
path = "examples/single_threaded.rs"
120+
required-features = ["runtime"]
121+
117122
[[example]]
118123
name = "state"
119124
path = "examples/state.rs"

examples/README.md

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ parses it with serde and outputs the result.
2121

2222
* [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously.
2323

24+
* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter).
25+
2426
* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count.
2527

2628
* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling).

examples/single_threaded.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#![deny(warnings)]
2+
extern crate futures;
3+
extern crate hyper;
4+
extern crate pretty_env_logger;
5+
extern crate tokio;
6+
7+
use std::cell::Cell;
8+
use std::rc::Rc;
9+
10+
use hyper::{Body, Response, Server};
11+
use hyper::service::service_fn_ok;
12+
use hyper::rt::Future;
13+
use tokio::runtime::current_thread;
14+
15+
fn main() {
16+
pretty_env_logger::init();
17+
18+
let addr = ([127, 0, 0, 1], 3000).into();
19+
20+
// Using a !Send request counter is fine on 1 thread...
21+
let counter = Rc::new(Cell::new(0));
22+
23+
let new_service = move || {
24+
// For each connection, clone the counter to use in our service...
25+
let cnt = counter.clone();
26+
27+
service_fn_ok(move |_| {
28+
let prev = cnt.get();
29+
cnt.set(prev + 1);
30+
Response::new(Body::from(format!("Request count: {}", prev + 1)))
31+
})
32+
};
33+
34+
// Since the Server needs to spawn some background tasks, we needed
35+
// to configure an Executor that can spawn !Send futures...
36+
let exec = current_thread::TaskExecutor::current();
37+
38+
let server = Server::bind(&addr)
39+
.executor(exec)
40+
.serve(new_service)
41+
.map_err(|e| eprintln!("server error: {}", e));
42+
43+
println!("Listening on http://{}", addr);
44+
45+
current_thread::Runtime::new()
46+
.expect("rt new")
47+
.spawn(server)
48+
.run()
49+
.expect("rt run");
50+
}
51+

src/common/drain.rs

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct Watch {
3636
rx: Shared<oneshot::Receiver<()>>,
3737
}
3838

39+
#[allow(missing_debug_implementations)]
3940
pub struct Watching<F, FN> {
4041
future: F,
4142
state: State<FN>,

src/common/exec.rs

+72-3
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,28 @@ use std::sync::Arc;
33

44
use futures::future::{Executor, Future};
55

6-
/// Either the user provides an executor for background tasks, or we use
7-
/// `tokio::spawn`.
6+
use body::Payload;
7+
use proto::h2::server::H2Stream;
8+
use server::conn::spawn_all::{NewSvcTask, Watcher};
9+
use service::Service;
10+
11+
pub trait H2Exec<F, B: Payload>: Clone {
12+
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()>;
13+
}
14+
15+
pub trait NewSvcExec<I, N, S: Service, E, W: Watcher<I, S, E>>: Clone {
16+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()>;
17+
}
18+
19+
// Either the user provides an executor for background tasks, or we use
20+
// `tokio::spawn`.
821
#[derive(Clone)]
9-
pub(crate) enum Exec {
22+
pub enum Exec {
1023
Default,
1124
Executor(Arc<Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync>),
1225
}
1326

27+
// ===== impl Exec =====
1428

1529
impl Exec {
1630
pub(crate) fn execute<F>(&self, fut: F) -> ::Result<()>
@@ -52,3 +66,58 @@ impl fmt::Debug for Exec {
5266
.finish()
5367
}
5468
}
69+
70+
71+
impl<F, B> H2Exec<F, B> for Exec
72+
where
73+
H2Stream<F, B>: Future<Item=(), Error=()> + Send + 'static,
74+
B: Payload,
75+
{
76+
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
77+
self.execute(fut)
78+
}
79+
}
80+
81+
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
82+
where
83+
NewSvcTask<I, N, S, E, W>: Future<Item=(), Error=()> + Send + 'static,
84+
S: Service,
85+
W: Watcher<I, S, E>,
86+
{
87+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
88+
self.execute(fut)
89+
}
90+
}
91+
92+
// ==== impl Executor =====
93+
94+
impl<E, F, B> H2Exec<F, B> for E
95+
where
96+
E: Executor<H2Stream<F, B>> + Clone,
97+
H2Stream<F, B>: Future<Item=(), Error=()>,
98+
B: Payload,
99+
{
100+
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
101+
self.execute(fut)
102+
.map_err(|err| {
103+
warn!("executor error: {:?}", err.kind());
104+
::Error::new_execute()
105+
})
106+
}
107+
}
108+
109+
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
110+
where
111+
E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
112+
NewSvcTask<I, N, S, E, W>: Future<Item=(), Error=()>,
113+
S: Service,
114+
W: Watcher<I, S, E>,
115+
{
116+
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
117+
self.execute(fut)
118+
.map_err(|err| {
119+
warn!("executor error: {:?}", err.kind());
120+
::Error::new_execute()
121+
})
122+
}
123+
}

src/common/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod buf;
22
pub(crate) mod drain;
3-
mod exec;
3+
pub(crate) mod exec;
44
pub(crate) mod io;
55
mod lazy;
66
mod never;

src/proto/h2/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use http::HeaderMap;
1010
use body::Payload;
1111

1212
mod client;
13-
mod server;
13+
pub(crate) mod server;
1414

1515
pub(crate) use self::client::Client;
1616
pub(crate) use self::server::Server;

src/proto/h2/server.rs

+15-12
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ use tokio_io::{AsyncRead, AsyncWrite};
55

66
use ::headers::content_length_parse_all;
77
use ::body::Payload;
8-
use ::common::Exec;
8+
use ::common::exec::H2Exec;
99
use ::headers;
1010
use ::service::Service;
1111
use ::proto::Dispatched;
1212
use super::{PipeToSendStream, SendBuf};
1313

1414
use ::{Body, Response};
1515

16-
pub(crate) struct Server<T, S, B>
16+
pub(crate) struct Server<T, S, B, E>
1717
where
1818
S: Service,
1919
B: Payload,
2020
{
21-
exec: Exec,
21+
exec: E,
2222
service: S,
2323
state: State<T, B>,
2424
}
@@ -40,15 +40,16 @@ where
4040
}
4141

4242

43-
impl<T, S, B> Server<T, S, B>
43+
impl<T, S, B, E> Server<T, S, B, E>
4444
where
4545
T: AsyncRead + AsyncWrite,
4646
S: Service<ReqBody=Body, ResBody=B>,
4747
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
48-
S::Future: Send + 'static,
48+
//S::Future: Send + 'static,
4949
B: Payload,
50+
E: H2Exec<S::Future, B>,
5051
{
51-
pub(crate) fn new(io: T, service: S, exec: Exec) -> Server<T, S, B> {
52+
pub(crate) fn new(io: T, service: S, exec: E) -> Server<T, S, B, E> {
5253
let handshake = Builder::new()
5354
.handshake(io);
5455
Server {
@@ -76,13 +77,14 @@ where
7677
}
7778
}
7879

79-
impl<T, S, B> Future for Server<T, S, B>
80+
impl<T, S, B, E> Future for Server<T, S, B, E>
8081
where
8182
T: AsyncRead + AsyncWrite,
8283
S: Service<ReqBody=Body, ResBody=B>,
8384
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
84-
S::Future: Send + 'static,
85+
//S::Future: Send + 'static,
8586
B: Payload,
87+
E: H2Exec<S::Future, B>,
8688
{
8789
type Item = Dispatched;
8890
type Error = ::Error;
@@ -116,14 +118,14 @@ where
116118
T: AsyncRead + AsyncWrite,
117119
B: Payload,
118120
{
119-
fn poll_server<S>(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error>
121+
fn poll_server<S, E>(&mut self, service: &mut S, exec: &E) -> Poll<(), ::Error>
120122
where
121123
S: Service<
122124
ReqBody=Body,
123125
ResBody=B,
124126
>,
125127
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
126-
S::Future: Send + 'static,
128+
E: H2Exec<S::Future, B>,
127129
{
128130
while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
129131
trace!("incoming request");
@@ -132,7 +134,7 @@ where
132134
::Body::h2(stream, content_length)
133135
});
134136
let fut = H2Stream::new(service.call(req), respond);
135-
exec.execute(fut)?;
137+
exec.execute_h2stream(fut)?;
136138
}
137139

138140
// no more incoming streams...
@@ -141,7 +143,8 @@ where
141143
}
142144
}
143145

144-
struct H2Stream<F, B>
146+
#[allow(missing_debug_implementations)]
147+
pub struct H2Stream<F, B>
145148
where
146149
B: Payload,
147150
{

0 commit comments

Comments
 (0)