@@ -115,6 +115,8 @@ struct Peer {
115
115
pending_read_is_header : bool ,
116
116
117
117
sync_status : InitSyncTracker ,
118
+
119
+ awaiting_pong : bool ,
118
120
}
119
121
120
122
impl Peer {
@@ -286,6 +288,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
286
288
pending_read_is_header : false ,
287
289
288
290
sync_status : InitSyncTracker :: NoSyncRequested ,
291
+
292
+ awaiting_pong : false ,
289
293
} ) . is_some ( ) {
290
294
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
291
295
} ;
@@ -322,6 +326,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
322
326
pending_read_is_header : false ,
323
327
324
328
sync_status : InitSyncTracker :: NoSyncRequested ,
329
+
330
+ awaiting_pong : false ,
325
331
} ) . is_some ( ) {
326
332
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
327
333
} ;
@@ -680,9 +686,9 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
680
686
}
681
687
} ,
682
688
19 => {
689
+ peer. awaiting_pong = false ;
683
690
try_potential_decodeerror ! ( msgs:: Pong :: read( & mut reader) ) ;
684
691
} ,
685
-
686
692
// Channel control:
687
693
32 => {
688
694
let msg = try_potential_decodeerror ! ( msgs:: OpenChannel :: read( & mut reader) ) ;
@@ -1088,6 +1094,48 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
1088
1094
}
1089
1095
} ;
1090
1096
}
1097
+
1098
+ /// This function should be called roughly once every 30 seconds.
1099
+ /// It will send pings to each peer and disconnect those which did not respond to the last round of pings.
1100
+
1101
+ /// Will most likely call send_data on all of the registered descriptors, thus, be very careful with reentrancy issues!
1102
+ pub fn timer_tick_occured ( & self ) {
1103
+ let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1104
+ {
1105
+ let peers = peers_lock. borrow_parts ( ) ;
1106
+ let peers_needing_send = peers. peers_needing_send ;
1107
+ let node_id_to_descriptor = peers. node_id_to_descriptor ;
1108
+ let peers = peers. peers ;
1109
+
1110
+ peers. retain ( |descriptor, peer| {
1111
+ if peer. awaiting_pong == true {
1112
+ peers_needing_send. remove ( descriptor) ;
1113
+ match peer. their_node_id {
1114
+ Some ( node_id) => {
1115
+ node_id_to_descriptor. remove ( & node_id) ;
1116
+ self . message_handler . chan_handler . peer_disconnected ( & node_id, true ) ;
1117
+ } ,
1118
+ None => { }
1119
+ }
1120
+ }
1121
+
1122
+ let ping = msgs:: Ping {
1123
+ ponglen : 0 ,
1124
+ byteslen : 64 ,
1125
+ } ;
1126
+ peer. pending_outbound_buffer . push_back ( encode_msg ! ( ping, 18 ) ) ;
1127
+ let mut descriptor_clone = descriptor. clone ( ) ;
1128
+ self . do_attempt_write_data ( & mut descriptor_clone, peer) ;
1129
+
1130
+ if peer. awaiting_pong {
1131
+ false // Drop the peer
1132
+ } else {
1133
+ peer. awaiting_pong = true ;
1134
+ true
1135
+ }
1136
+ } ) ;
1137
+ }
1138
+ }
1091
1139
}
1092
1140
1093
1141
#[ cfg( test) ]
@@ -1171,4 +1219,19 @@ mod tests {
1171
1219
peers[ 0 ] . process_events ( ) ;
1172
1220
assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
1173
1221
}
1222
+ #[ test]
1223
+ fn test_timer_tick_occured ( ) {
1224
+ // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
1225
+ let peers = create_network ( 2 ) ;
1226
+ establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1227
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1228
+
1229
+ // peers[0] awaiting_pong is set to true, but the Peer is still connected
1230
+ peers[ 0 ] . timer_tick_occured ( ) ;
1231
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1232
+
1233
+ // Since timer_tick_occured() is called again when awaiting_pong is true, all Peers are disconnected
1234
+ peers[ 0 ] . timer_tick_occured ( ) ;
1235
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
1236
+ }
1174
1237
}
0 commit comments