|
1 | 1 | use bytes::IntoBuf;
|
2 | 2 | use futures::{Async, Future, Poll, Stream};
|
| 3 | +use futures::future::{self, Either}; |
| 4 | +use futures::sync::mpsc; |
3 | 5 | use h2::client::{Builder, Handshake, SendRequest};
|
4 | 6 | use tokio_io::{AsyncRead, AsyncWrite};
|
5 | 7 |
|
6 | 8 | use headers::content_length_parse_all;
|
7 | 9 | use body::Payload;
|
8 |
| -use ::common::Exec; |
| 10 | +use ::common::{Exec, Never}; |
9 | 11 | use headers;
|
10 | 12 | use ::proto::Dispatched;
|
11 | 13 | use super::{PipeToSendStream, SendBuf};
|
12 | 14 | use ::{Body, Request, Response};
|
13 | 15 |
|
14 | 16 | type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
|
| 17 | +/// An mpsc channel is used to help notify the `Connection` task when *all* |
| 18 | +/// other handles to it have been dropped, so that it can shutdown. |
| 19 | +type ConnDropRef = mpsc::Sender<Never>; |
15 | 20 |
|
16 | 21 | pub(crate) struct Client<T, B>
|
17 | 22 | where
|
|
24 | 29 |
|
25 | 30 | enum State<T, B> where B: IntoBuf {
|
26 | 31 | Handshaking(Handshake<T, B>),
|
27 |
| - Ready(SendRequest<B>), |
| 32 | + Ready(SendRequest<B>, ConnDropRef), |
28 | 33 | }
|
29 | 34 |
|
30 | 35 | impl<T, B> Client<T, B>
|
@@ -59,13 +64,40 @@ where
|
59 | 64 | let next = match self.state {
|
60 | 65 | State::Handshaking(ref mut h) => {
|
61 | 66 | let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2));
|
| 67 | + // An mpsc channel is used entirely to detect when the |
| 68 | + // 'Client' has been dropped. This is to get around a bug |
| 69 | + // in h2 where dropping all SendRequests won't notify a |
| 70 | + // parked Connection. |
| 71 | + let (tx, rx) = mpsc::channel(0); |
| 72 | + let rx = rx.into_future() |
| 73 | + .map(|(msg, _)| match msg { |
| 74 | + Some(never) => match never {}, |
| 75 | + None => (), |
| 76 | + }) |
| 77 | + .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); |
62 | 78 | let fut = conn
|
63 | 79 | .inspect(|_| trace!("connection complete"))
|
64 |
| - .map_err(|e| debug!("connection error: {}", e)); |
| 80 | + .map_err(|e| debug!("connection error: {}", e)) |
| 81 | + .select2(rx) |
| 82 | + .then(|res| match res { |
| 83 | + Ok(Either::A(((), _))) | |
| 84 | + Err(Either::A(((), _))) => { |
| 85 | + // conn has finished either way |
| 86 | + Either::A(future::ok(())) |
| 87 | + }, |
| 88 | + Ok(Either::B(((), conn))) => { |
| 89 | + // mpsc has been dropped, hopefully polling |
| 90 | + // the connection some more should start shutdown |
| 91 | + // and then close |
| 92 | + trace!("send_request dropped, starting conn shutdown"); |
| 93 | + Either::B(conn) |
| 94 | + } |
| 95 | + Err(Either::B((never, _))) => match never {}, |
| 96 | + }); |
65 | 97 | self.executor.execute(fut)?;
|
66 |
| - State::Ready(request_tx) |
| 98 | + State::Ready(request_tx, tx) |
67 | 99 | },
|
68 |
| - State::Ready(ref mut tx) => { |
| 100 | + State::Ready(ref mut tx, ref conn_dropper) => { |
69 | 101 | try_ready!(tx.poll_ready().map_err(::Error::new_h2));
|
70 | 102 | match self.rx.poll() {
|
71 | 103 | Ok(Async::Ready(Some((req, mut cb)))) => {
|
@@ -98,6 +130,11 @@ where
|
98 | 130 | match pipe.poll() {
|
99 | 131 | Ok(Async::Ready(())) | Err(()) => (),
|
100 | 132 | Ok(Async::NotReady) => {
|
| 133 | + let conn_drop_ref = conn_dropper.clone(); |
| 134 | + let pipe = pipe.then(move |x| { |
| 135 | + drop(conn_drop_ref); |
| 136 | + x |
| 137 | + }); |
101 | 138 | self.executor.execute(pipe)?;
|
102 | 139 | }
|
103 | 140 | }
|
|
0 commit comments