@@ -14,7 +14,6 @@ use httparse::ParserConfig;
14
14
use super :: super :: dispatch;
15
15
use crate :: body:: { Body , Incoming as IncomingBody } ;
16
16
use crate :: proto;
17
- use crate :: upgrade:: Upgraded ;
18
17
19
18
type Dispatcher < T , B > =
20
19
proto:: dispatch:: Dispatcher < proto:: dispatch:: Client < B > , B , T , proto:: h1:: ClientTransaction > ;
@@ -51,23 +50,23 @@ pub struct Parts<T> {
51
50
#[ must_use = "futures do nothing unless polled" ]
52
51
pub struct Connection < T , B >
53
52
where
54
- T : Read + Write + Send + ' static ,
53
+ T : Read + Write + ' static ,
55
54
B : Body + ' static ,
56
55
{
57
- inner : Option < Dispatcher < T , B > > ,
56
+ inner : Dispatcher < T , B > ,
58
57
}
59
58
60
59
impl < T , B > Connection < T , B >
61
60
where
62
- T : Read + Write + Send + Unpin + ' static ,
61
+ T : Read + Write + Unpin + ' static ,
63
62
B : Body + ' static ,
64
63
B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
65
64
{
66
65
/// Return the inner IO object, and additional information.
67
66
///
68
67
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
69
68
pub fn into_parts ( self ) -> Parts < T > {
70
- let ( io, read_buf, _) = self . inner . expect ( "already upgraded" ) . into_inner ( ) ;
69
+ let ( io, read_buf, _) = self . inner . into_inner ( ) ;
71
70
Parts {
72
71
io,
73
72
read_buf,
87
86
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
88
87
/// to work with this function; or use the `without_shutdown` wrapper.
89
88
pub fn poll_without_shutdown ( & mut self , cx : & mut Context < ' _ > ) -> Poll < crate :: Result < ( ) > > {
90
- self . inner
91
- . as_mut ( )
92
- . expect ( "already upgraded" )
93
- . poll_without_shutdown ( cx)
89
+ self . inner . poll_without_shutdown ( cx)
94
90
}
95
91
}
96
92
@@ -119,7 +115,7 @@ pub struct Builder {
119
115
/// See [`client::conn`](crate::client::conn) for more.
120
116
pub async fn handshake < T , B > ( io : T ) -> crate :: Result < ( SendRequest < B > , Connection < T , B > ) >
121
117
where
122
- T : Read + Write + Unpin + Send + ' static ,
118
+ T : Read + Write + Unpin + ' static ,
123
119
B : Body + ' static ,
124
120
B :: Data : Send ,
125
121
B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
@@ -240,9 +236,23 @@ impl<B> fmt::Debug for SendRequest<B> {
240
236
241
237
// ===== impl Connection
242
238
239
+ impl < T , B > Connection < T , B >
240
+ where
241
+ T : Read + Write + Unpin + Send + ' static ,
242
+ B : Body + ' static ,
243
+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
244
+ {
245
+ /// Enable this connection to support higher-level HTTP upgrades.
246
+ ///
247
+ /// See [the `upgrade` module](crate::upgrade) for more.
248
+ pub fn with_upgrades ( self ) -> upgrades:: UpgradeableConnection < T , B > {
249
+ upgrades:: UpgradeableConnection { inner : Some ( self ) }
250
+ }
251
+ }
252
+
243
253
impl < T , B > fmt:: Debug for Connection < T , B >
244
254
where
245
- T : Read + Write + fmt:: Debug + Send + ' static ,
255
+ T : Read + Write + fmt:: Debug + ' static ,
246
256
B : Body + ' static ,
247
257
{
248
258
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
@@ -252,27 +262,24 @@ where
252
262
253
263
impl < T , B > Future for Connection < T , B >
254
264
where
255
- T : Read + Write + Unpin + Send + ' static ,
265
+ T : Read + Write + Unpin + ' static ,
256
266
B : Body + ' static ,
257
267
B :: Data : Send ,
258
268
B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
259
269
{
260
270
type Output = crate :: Result < ( ) > ;
261
271
262
272
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
263
- match ready ! ( Pin :: new( self . inner. as_mut ( ) . unwrap ( ) ) . poll( cx) ) ? {
273
+ match ready ! ( Pin :: new( & mut self . inner) . poll( cx) ) ? {
264
274
proto:: Dispatched :: Shutdown => Poll :: Ready ( Ok ( ( ) ) ) ,
265
- proto:: Dispatched :: Upgrade ( pending) => match self . inner . take ( ) {
266
- Some ( h1) => {
267
- let ( io, buf, _) = h1. into_inner ( ) ;
268
- pending. fulfill ( Upgraded :: new ( io, buf) ) ;
269
- Poll :: Ready ( Ok ( ( ) ) )
270
- }
271
- _ => {
272
- drop ( pending) ;
273
- unreachable ! ( "Upgraded twice" ) ;
274
- }
275
- } ,
275
+ proto:: Dispatched :: Upgrade ( pending) => {
276
+ // With no `Send` bound on `I`, we can't try to do
277
+ // upgrades here. In case a user was trying to use
278
+ // `upgrade` with this API, send a special
279
+ // error letting them know about that.
280
+ pending. manual ( ) ;
281
+ Poll :: Ready ( Ok ( ( ) ) )
282
+ }
276
283
}
277
284
}
278
285
}
@@ -474,7 +481,7 @@ impl Builder {
474
481
io : T ,
475
482
) -> impl Future < Output = crate :: Result < ( SendRequest < B > , Connection < T , B > ) > >
476
483
where
477
- T : Read + Write + Unpin + Send + ' static ,
484
+ T : Read + Write + Unpin + ' static ,
478
485
B : Body + ' static ,
479
486
B :: Data : Send ,
480
487
B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
@@ -518,10 +525,53 @@ impl Builder {
518
525
let cd = proto:: h1:: dispatch:: Client :: new ( rx) ;
519
526
let proto = proto:: h1:: Dispatcher :: new ( cd, conn) ;
520
527
521
- Ok ( (
522
- SendRequest { dispatch : tx } ,
523
- Connection { inner : Some ( proto) } ,
524
- ) )
528
+ Ok ( ( SendRequest { dispatch : tx } , Connection { inner : proto } ) )
529
+ }
530
+ }
531
+ }
532
+
533
+ mod upgrades {
534
+ use crate :: upgrade:: Upgraded ;
535
+
536
+ use super :: * ;
537
+
538
+ // A future binding a connection with a Service with Upgrade support.
539
+ //
540
+ // This type is unnameable outside the crate.
541
+ #[ must_use = "futures do nothing unless polled" ]
542
+ #[ allow( missing_debug_implementations) ]
543
+ pub struct UpgradeableConnection < T , B >
544
+ where
545
+ T : Read + Write + Unpin + Send + ' static ,
546
+ B : Body + ' static ,
547
+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
548
+ {
549
+ pub ( super ) inner : Option < Connection < T , B > > ,
550
+ }
551
+
552
+ impl < I , B > Future for UpgradeableConnection < I , B >
553
+ where
554
+ I : Read + Write + Unpin + Send + ' static ,
555
+ B : Body + ' static ,
556
+ B :: Data : Send ,
557
+ B :: Error : Into < Box < dyn StdError + Send + Sync > > ,
558
+ {
559
+ type Output = crate :: Result < ( ) > ;
560
+
561
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
562
+ match ready ! ( Pin :: new( & mut self . inner. as_mut( ) . unwrap( ) . inner) . poll( cx) ) {
563
+ Ok ( proto:: Dispatched :: Shutdown ) => Poll :: Ready ( Ok ( ( ) ) ) ,
564
+ Ok ( proto:: Dispatched :: Upgrade ( pending) ) => {
565
+ let Parts {
566
+ io,
567
+ read_buf,
568
+ _inner,
569
+ } = self . inner . take ( ) . unwrap ( ) . into_parts ( ) ;
570
+ pending. fulfill ( Upgraded :: new ( io, read_buf) ) ;
571
+ Poll :: Ready ( Ok ( ( ) ) )
572
+ }
573
+ Err ( e) => Poll :: Ready ( Err ( e) ) ,
574
+ }
525
575
}
526
576
}
527
577
}
0 commit comments