Skip to content

Commit 4ffaad5

Browse files
authored
feat(client): add SendRequest::try_send_request() method (#3691)
This method returns a `TrySendError` type, which allows for returning the request back to the caller if an error occured between queuing and trying to write the request. This method is added for both `http1` and `http2`.
1 parent 56c3cd5 commit 4ffaad5

File tree

7 files changed

+214
-74
lines changed

7 files changed

+214
-74
lines changed

src/client/conn/http1.rs

+28-23
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use futures_util::ready;
1212
use http::{Request, Response};
1313
use httparse::ParserConfig;
1414

15-
use super::super::dispatch;
15+
use super::super::dispatch::{self, TrySendError};
1616
use crate::body::{Body, Incoming as IncomingBody};
1717
use crate::proto;
1818

@@ -208,33 +208,38 @@ where
208208
}
209209
}
210210

211-
/*
212-
pub(super) fn send_request_retryable(
211+
/// Sends a `Request` on the associated connection.
212+
///
213+
/// Returns a future that if successful, yields the `Response`.
214+
///
215+
/// # Error
216+
///
217+
/// If there was an error before trying to serialize the request to the
218+
/// connection, the message will be returned as part of this error.
219+
pub fn try_send_request(
213220
&mut self,
214221
req: Request<B>,
215-
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
216-
where
217-
B: Send,
218-
{
219-
match self.dispatch.try_send(req) {
220-
Ok(rx) => {
221-
Either::Left(rx.then(move |res| {
222-
match res {
223-
Ok(Ok(res)) => future::ok(res),
224-
Ok(Err(err)) => future::err(err),
225-
// this is definite bug if it happens, but it shouldn't happen!
226-
Err(_) => panic!("dispatch dropped without returning error"),
227-
}
228-
}))
229-
}
230-
Err(req) => {
231-
debug!("connection was not ready");
232-
let err = crate::Error::new_canceled().with("connection was not ready");
233-
Either::Right(future::err((err, Some(req))))
222+
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
223+
let sent = self.dispatch.try_send(req);
224+
async move {
225+
match sent {
226+
Ok(rx) => match rx.await {
227+
Ok(Ok(res)) => Ok(res),
228+
Ok(Err(err)) => Err(err),
229+
// this is definite bug if it happens, but it shouldn't happen!
230+
Err(_) => panic!("dispatch dropped without returning error"),
231+
},
232+
Err(req) => {
233+
debug!("connection was not ready");
234+
let error = crate::Error::new_canceled().with("connection was not ready");
235+
Err(TrySendError {
236+
error,
237+
message: Some(req),
238+
})
239+
}
234240
}
235241
}
236242
}
237-
*/
238243
}
239244

240245
impl<B> fmt::Debug for SendRequest<B> {

src/client/conn/http2.rs

+28-23
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::rt::{Read, Write};
1313
use futures_util::ready;
1414
use http::{Request, Response};
1515

16-
use super::super::dispatch;
16+
use super::super::dispatch::{self, TrySendError};
1717
use crate::body::{Body, Incoming as IncomingBody};
1818
use crate::common::time::Time;
1919
use crate::proto;
@@ -152,33 +152,38 @@ where
152152
}
153153
}
154154

155-
/*
156-
pub(super) fn send_request_retryable(
155+
/// Sends a `Request` on the associated connection.
156+
///
157+
/// Returns a future that if successful, yields the `Response`.
158+
///
159+
/// # Error
160+
///
161+
/// If there was an error before trying to serialize the request to the
162+
/// connection, the message will be returned as part of this error.
163+
pub fn try_send_request(
157164
&mut self,
158165
req: Request<B>,
159-
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
160-
where
161-
B: Send,
162-
{
163-
match self.dispatch.try_send(req) {
164-
Ok(rx) => {
165-
Either::Left(rx.then(move |res| {
166-
match res {
167-
Ok(Ok(res)) => future::ok(res),
168-
Ok(Err(err)) => future::err(err),
169-
// this is definite bug if it happens, but it shouldn't happen!
170-
Err(_) => panic!("dispatch dropped without returning error"),
171-
}
172-
}))
173-
}
174-
Err(req) => {
175-
debug!("connection was not ready");
176-
let err = crate::Error::new_canceled().with("connection was not ready");
177-
Either::Right(future::err((err, Some(req))))
166+
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
167+
let sent = self.dispatch.try_send(req);
168+
async move {
169+
match sent {
170+
Ok(rx) => match rx.await {
171+
Ok(Ok(res)) => Ok(res),
172+
Ok(Err(err)) => Err(err),
173+
// this is definite bug if it happens, but it shouldn't happen!
174+
Err(_) => panic!("dispatch dropped without returning error"),
175+
},
176+
Err(req) => {
177+
debug!("connection was not ready");
178+
let error = crate::Error::new_canceled().with("connection was not ready");
179+
Err(TrySendError {
180+
error,
181+
message: Some(req),
182+
})
183+
}
178184
}
179185
}
180186
}
181-
*/
182187
}
183188

184189
impl<B> fmt::Debug for SendRequest<B> {

src/client/conn/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@
1818
pub mod http1;
1919
#[cfg(feature = "http2")]
2020
pub mod http2;
21+
22+
pub use super::dispatch::TrySendError;

src/client/dispatch.rs

+45-16
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,21 @@ use tokio::sync::{mpsc, oneshot};
1313
#[cfg(feature = "http2")]
1414
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
1515

16-
#[cfg(test)]
17-
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
16+
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
1817
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
1918

19+
/// An error when calling `try_send_request`.
20+
///
21+
/// There is a possibility of an error occuring on a connection in-between the
22+
/// time that a request is queued and when it is actually written to the IO
23+
/// transport. If that happens, it is safe to return the request back to the
24+
/// caller, as it was never fully sent.
25+
#[derive(Debug)]
26+
pub struct TrySendError<T> {
27+
pub(crate) error: crate::Error,
28+
pub(crate) message: Option<T>,
29+
}
30+
2031
pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
2132
let (tx, rx) = mpsc::unbounded_channel();
2233
let (giver, taker) = want::new();
@@ -92,7 +103,7 @@ impl<T, U> Sender<T, U> {
92103
}
93104
}
94105

95-
#[cfg(test)]
106+
#[cfg(feature = "http1")]
96107
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
97108
if !self.can_send() {
98109
return Err(val);
@@ -135,7 +146,6 @@ impl<T, U> UnboundedSender<T, U> {
135146
self.giver.is_canceled()
136147
}
137148

138-
#[cfg(test)]
139149
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
140150
let (tx, rx) = oneshot::channel();
141151
self.inner
@@ -210,17 +220,17 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
210220
impl<T, U> Drop for Envelope<T, U> {
211221
fn drop(&mut self) {
212222
if let Some((val, cb)) = self.0.take() {
213-
cb.send(Err((
214-
crate::Error::new_canceled().with("connection closed"),
215-
Some(val),
216-
)));
223+
cb.send(Err(TrySendError {
224+
error: crate::Error::new_canceled().with("connection closed"),
225+
message: Some(val),
226+
}));
217227
}
218228
}
219229
}
220230

221231
pub(crate) enum Callback<T, U> {
222232
#[allow(unused)]
223-
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
233+
Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
224234
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
225235
}
226236

@@ -229,7 +239,10 @@ impl<T, U> Drop for Callback<T, U> {
229239
match self {
230240
Callback::Retry(tx) => {
231241
if let Some(tx) = tx.take() {
232-
let _ = tx.send(Err((dispatch_gone(), None)));
242+
let _ = tx.send(Err(TrySendError {
243+
error: dispatch_gone(),
244+
message: None,
245+
}));
233246
}
234247
}
235248
Callback::NoRetry(tx) => {
@@ -269,18 +282,34 @@ impl<T, U> Callback<T, U> {
269282
}
270283
}
271284

272-
pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
285+
pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
273286
match self {
274287
Callback::Retry(ref mut tx) => {
275288
let _ = tx.take().unwrap().send(val);
276289
}
277290
Callback::NoRetry(ref mut tx) => {
278-
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
291+
let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
279292
}
280293
}
281294
}
282295
}
283296

297+
impl<T> TrySendError<T> {
298+
/// Take the message from this error.
299+
///
300+
/// The message will not always have been recovered. If an error occurs
301+
/// after the message has been serialized onto the connection, it will not
302+
/// be available here.
303+
pub fn take_message(&mut self) -> Option<T> {
304+
self.message.take()
305+
}
306+
307+
/// Consumes this to return the inner error.
308+
pub fn into_error(self) -> crate::Error {
309+
self.error
310+
}
311+
}
312+
284313
#[cfg(feature = "http2")]
285314
pin_project! {
286315
pub struct SendWhen<B>
@@ -325,8 +354,8 @@ where
325354
trace!("send_when canceled");
326355
Poll::Ready(())
327356
}
328-
Poll::Ready(Err(err)) => {
329-
call_back.send(Err(err));
357+
Poll::Ready(Err((error, message))) => {
358+
call_back.send(Err(TrySendError { error, message }));
330359
Poll::Ready(())
331360
}
332361
}
@@ -389,8 +418,8 @@ mod tests {
389418
let err = fulfilled
390419
.expect("fulfilled")
391420
.expect_err("promise should error");
392-
match (err.0.kind(), err.1) {
393-
(&crate::error::Kind::Canceled, Some(_)) => (),
421+
match (err.error.is_canceled(), err.message) {
422+
(true, Some(_)) => (),
394423
e => panic!("expected Error::Cancel(_), found {:?}", e),
395424
}
396425
}

src/proto/h1/dispatch.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ use http::Request;
1313

1414
use super::{Http1Transaction, Wants};
1515
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
16+
#[cfg(feature = "client")]
17+
use crate::client::dispatch::TrySendError;
1618
use crate::common::task;
1719
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
1820
use crate::upgrade::OnUpgrade;
@@ -655,15 +657,21 @@ cfg_client! {
655657
}
656658
Err(err) => {
657659
if let Some(cb) = self.callback.take() {
658-
cb.send(Err((err, None)));
660+
cb.send(Err(TrySendError {
661+
error: err,
662+
message: None,
663+
}));
659664
Ok(())
660665
} else if !self.rx_closed {
661666
self.rx.close();
662667
if let Some((req, cb)) = self.rx.try_recv() {
663668
trace!("canceling queued request with connection error: {}", err);
664669
// in this case, the message was never even started, so it's safe to tell
665670
// the user that the request was completely canceled
666-
cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
671+
cb.send(Err(TrySendError {
672+
error: crate::Error::new_canceled().with(err),
673+
message: Some(req),
674+
}));
667675
Ok(())
668676
} else {
669677
Err(err)
@@ -729,9 +737,9 @@ mod tests {
729737
let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
730738
.expect_err("callback should send error");
731739

732-
match (err.0.kind(), err.1) {
733-
(&crate::error::Kind::Canceled, Some(_)) => (),
734-
other => panic!("expected Canceled, got {:?}", other),
740+
match (err.error.is_canceled(), err.message.as_ref()) {
741+
(true, Some(_)) => (),
742+
_ => panic!("expected Canceled, got {:?}", err),
735743
}
736744
});
737745
}

src/proto/h2/client.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use pin_project_lite::pin_project;
2222
use super::ping::{Ponger, Recorder};
2323
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
2424
use crate::body::{Body, Incoming as IncomingBody};
25-
use crate::client::dispatch::{Callback, SendWhen};
25+
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
2626
use crate::common::io::Compat;
2727
use crate::common::time::Time;
2828
use crate::ext::Protocol;
@@ -662,10 +662,10 @@ where
662662
.map_or(false, |len| len != 0)
663663
{
664664
warn!("h2 connect request with non-zero body not supported");
665-
cb.send(Err((
666-
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667-
None,
668-
)));
665+
cb.send(Err(TrySendError {
666+
error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667+
message: None,
668+
}));
669669
continue;
670670
}
671671

@@ -677,7 +677,10 @@ where
677677
Ok(ok) => ok,
678678
Err(err) => {
679679
debug!("client send request error: {}", err);
680-
cb.send(Err((crate::Error::new_h2(err), None)));
680+
cb.send(Err(TrySendError {
681+
error: crate::Error::new_h2(err),
682+
message: None,
683+
}));
681684
continue;
682685
}
683686
};
@@ -702,7 +705,10 @@ where
702705
}
703706
Poll::Ready(Ok(())) => (),
704707
Poll::Ready(Err(err)) => {
705-
f.cb.send(Err((crate::Error::new_h2(err), None)));
708+
f.cb.send(Err(TrySendError {
709+
error: crate::Error::new_h2(err),
710+
message: None,
711+
}));
706712
continue;
707713
}
708714
}

0 commit comments

Comments
 (0)