@@ -37,6 +37,10 @@ struct ConnInner<K: Key, T: Transport, H: MessageHandler<T>> {
37
37
key : K ,
38
38
state : State < H , T > ,
39
39
transport : T ,
40
+ /// Records a WouldBlock error when trying to read
41
+ ///
42
+ /// This flag is used to prevent busy looping
43
+ read_would_block : bool ,
40
44
}
41
45
42
46
impl < K : Key , T : Transport , H : MessageHandler < T > > fmt:: Debug for ConnInner < K , T , H > {
@@ -153,7 +157,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
153
157
Ok ( head) => head,
154
158
Err ( :: Error :: Io ( e) ) => match e. kind ( ) {
155
159
io:: ErrorKind :: WouldBlock |
156
- io:: ErrorKind :: Interrupted => return state,
160
+ io:: ErrorKind :: Interrupted => {
161
+ self . read_would_block = true ;
162
+ return state;
163
+ } ,
157
164
_ => {
158
165
debug ! ( "io error trying to parse {:?}" , e) ;
159
166
return State :: Closed ;
@@ -250,6 +257,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
250
257
Err ( e) => match e. kind ( ) {
251
258
io:: ErrorKind :: WouldBlock => {
252
259
// This is the expected case reading in this state
260
+ self . read_would_block = true ;
253
261
state
254
262
} ,
255
263
_ => {
@@ -288,7 +296,10 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
288
296
} ,
289
297
Err ( :: Error :: Io ( e) ) => match e. kind ( ) {
290
298
io:: ErrorKind :: WouldBlock |
291
- io:: ErrorKind :: Interrupted => None ,
299
+ io:: ErrorKind :: Interrupted => {
300
+ self . read_would_block = true ;
301
+ None
302
+ } ,
292
303
_ => {
293
304
debug ! ( "io error trying to parse {:?}" , e) ;
294
305
return State :: Closed ;
@@ -482,10 +493,15 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
482
493
}
483
494
484
495
fn can_read_more ( & self , was_init : bool ) -> bool {
485
- match self . state {
496
+ let transport_blocked = self . transport . blocked ( ) . is_some ( ) ;
497
+ let read_would_block = self . read_would_block ;
498
+
499
+ let state_machine_ok = match self . state {
486
500
State :: Init { .. } => !was_init && !self . buf . is_empty ( ) ,
487
501
_ => !self . buf . is_empty ( )
488
- }
502
+ } ;
503
+
504
+ !transport_blocked && !read_would_block && state_machine_ok
489
505
}
490
506
491
507
fn on_error < F > ( & mut self , err : :: Error , factory : & F ) where F : MessageHandlerFactory < K , T > {
@@ -501,6 +517,8 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> ConnInner<K, T, H> {
501
517
502
518
fn on_readable < F > ( & mut self , scope : & mut Scope < F > )
503
519
where F : MessageHandlerFactory < K , T , Output =H > {
520
+ // Clear would_block flag so state is clear going into read
521
+ self . read_would_block = false ;
504
522
trace ! ( "on_readable -> {:?}" , self . state) ;
505
523
let state = mem:: replace ( & mut self . state , State :: Closed ) ;
506
524
self . state = self . read ( scope, state) ;
@@ -549,6 +567,7 @@ impl<K: Key, T: Transport, H: MessageHandler<T>> Conn<K, T, H> {
549
567
timeout_start : Some ( now) ,
550
568
} ,
551
569
transport : transport,
570
+ read_would_block : false ,
552
571
} ) )
553
572
}
554
573
0 commit comments