@@ -32,7 +32,7 @@ use routing::network_graph::NetGraphMsgHandler;
32
32
33
33
use prelude:: * ;
34
34
use alloc:: collections:: LinkedList ;
35
- use std:: collections:: { HashMap , hash_map, HashSet } ;
35
+ use std:: collections:: { HashMap , hash_map} ;
36
36
use std:: sync:: { Arc , Mutex } ;
37
37
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
38
38
use core:: { cmp, hash, fmt, mem} ;
@@ -288,9 +288,6 @@ impl Peer {
288
288
289
289
struct PeerHolder < Descriptor : SocketDescriptor > {
290
290
peers : HashMap < Descriptor , Peer > ,
291
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
292
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
293
- peers_needing_send : HashSet < Descriptor > ,
294
291
/// Only add to this set when noise completes:
295
292
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
296
293
}
@@ -421,7 +418,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
421
418
message_handler,
422
419
peers : Mutex :: new ( PeerHolder {
423
420
peers : HashMap :: new ( ) ,
424
- peers_needing_send : HashSet :: new ( ) ,
425
421
node_id_to_descriptor : HashMap :: new ( )
426
422
} ) ,
427
423
our_node_secret,
@@ -653,14 +649,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
653
649
}
654
650
655
651
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
656
- fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , descriptor : Descriptor , message : & M ) {
652
+ fn enqueue_message < M : Encode + Writeable > ( & self , peer : & mut Peer , message : & M ) {
657
653
let mut buffer = VecWriter ( Vec :: new ( ) ) ;
658
654
wire:: write ( message, & mut buffer) . unwrap ( ) ; // crash if the write failed
659
655
let encoded_message = buffer. 0 ;
660
656
661
657
log_trace ! ( self . logger, "Enqueueing message of type {} to {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
662
658
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
663
- peers_needing_send. insert ( descriptor) ;
664
659
}
665
660
666
661
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
@@ -704,7 +699,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
704
699
} ,
705
700
msgs:: ErrorAction :: SendErrorMessage { msg } => {
706
701
log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
707
- self . enqueue_message( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & msg) ;
702
+ self . enqueue_message( peer, & msg) ;
708
703
continue ;
709
704
} ,
710
705
}
@@ -746,7 +741,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
746
741
insert_node_id ! ( ) ;
747
742
let features = InitFeatures :: known ( ) ;
748
743
let resp = msgs:: Init { features } ;
749
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
744
+ self . enqueue_message ( peer, & resp) ;
750
745
} ,
751
746
NextNoiseStep :: ActThree => {
752
747
let their_node_id = try_potential_handleerror ! ( peer. channel_encryptor. process_act_three( & peer. pending_read_buffer[ ..] ) ) ;
@@ -756,7 +751,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
756
751
insert_node_id ! ( ) ;
757
752
let features = InitFeatures :: known ( ) ;
758
753
let resp = msgs:: Init { features } ;
759
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
754
+ self . enqueue_message ( peer, & resp) ;
760
755
} ,
761
756
NextNoiseStep :: NoiseComplete => {
762
757
if peer. pending_read_is_header {
@@ -804,7 +799,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
804
799
}
805
800
} ;
806
801
807
- match self . handle_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , message) {
802
+ match self . handle_message ( peer, message) {
808
803
Err ( handling_error) => match handling_error {
809
804
MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
810
805
MessageHandlingError :: LightningError ( e) => {
@@ -839,7 +834,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
839
834
840
835
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
841
836
/// Returns the message back if it needs to be broadcasted to all other peers.
842
- fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
837
+ fn handle_message ( & self , peer : & mut Peer , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
843
838
log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
844
839
845
840
// Need an Init as first message
@@ -874,7 +869,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
874
869
875
870
if msg. features . initial_routing_sync ( ) {
876
871
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
877
- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
878
872
}
879
873
if !msg. features . supports_static_remote_key ( ) {
880
874
log_debug ! ( self . logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
@@ -909,7 +903,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
909
903
wire:: Message :: Ping ( msg) => {
910
904
if msg. ponglen < 65532 {
911
905
let resp = msgs:: Pong { byteslen : msg. ponglen } ;
912
- self . enqueue_message ( peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
906
+ self . enqueue_message ( peer, & resp) ;
913
907
}
914
908
} ,
915
909
wire:: Message :: Pong ( _msg) => {
@@ -1031,7 +1025,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1031
1025
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1032
1026
let encoded_msg = encode_msg ! ( msg) ;
1033
1027
1034
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1028
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1035
1029
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1036
1030
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1037
1031
continue
@@ -1047,13 +1041,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1047
1041
continue ;
1048
1042
}
1049
1043
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1050
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1051
1044
}
1052
1045
} ,
1053
1046
wire:: Message :: NodeAnnouncement ( ref msg) => {
1054
1047
let encoded_msg = encode_msg ! ( msg) ;
1055
1048
1056
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1049
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1057
1050
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1058
1051
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1059
1052
continue
@@ -1068,13 +1061,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1068
1061
continue ;
1069
1062
}
1070
1063
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1071
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1072
1064
}
1073
1065
} ,
1074
1066
wire:: Message :: ChannelUpdate ( ref msg) => {
1075
1067
let encoded_msg = encode_msg ! ( msg) ;
1076
1068
1077
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1069
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1078
1070
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1079
1071
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1080
1072
continue
@@ -1086,7 +1078,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1086
1078
continue ;
1087
1079
}
1088
1080
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1089
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1090
1081
}
1091
1082
} ,
1092
1083
_ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
@@ -1133,17 +1124,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1133
1124
log_trace ! ( self . logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}" ,
1134
1125
log_pubkey!( node_id) ,
1135
1126
log_bytes!( msg. temporary_channel_id) ) ;
1136
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1127
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1137
1128
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1138
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1139
1129
} ,
1140
1130
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
1141
1131
log_trace ! ( self . logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
1142
1132
log_pubkey!( node_id) ,
1143
1133
log_bytes!( msg. temporary_channel_id) ) ;
1144
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1134
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1145
1135
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1146
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1147
1136
} ,
1148
1137
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
1149
1138
log_trace ! ( self . logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -1152,33 +1141,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1152
1141
log_funding_channel_id!( msg. funding_txid, msg. funding_output_index) ) ;
1153
1142
// TODO: If the peer is gone we should generate a DiscardFunding event
1154
1143
// indicating to the wallet that they should just throw away this funding transaction
1155
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1144
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1156
1145
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1157
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1158
1146
} ,
1159
1147
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
1160
1148
log_trace ! ( self . logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
1161
1149
log_pubkey!( node_id) ,
1162
1150
log_bytes!( msg. channel_id) ) ;
1163
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1151
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1164
1152
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1165
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1166
1153
} ,
1167
1154
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
1168
1155
log_trace ! ( self . logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
1169
1156
log_pubkey!( node_id) ,
1170
1157
log_bytes!( msg. channel_id) ) ;
1171
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1158
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1172
1159
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1173
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1174
1160
} ,
1175
1161
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
1176
1162
log_trace ! ( self . logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
1177
1163
log_pubkey!( node_id) ,
1178
1164
log_bytes!( msg. channel_id) ) ;
1179
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1165
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1180
1166
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1181
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1182
1167
} ,
1183
1168
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
1184
1169
log_trace ! ( self . logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -1187,7 +1172,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1187
1172
update_fulfill_htlcs. len( ) ,
1188
1173
update_fail_htlcs. len( ) ,
1189
1174
log_bytes!( commitment_signed. channel_id) ) ;
1190
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1175
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1191
1176
for msg in update_add_htlcs {
1192
1177
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1193
1178
}
@@ -1204,39 +1189,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1204
1189
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1205
1190
}
1206
1191
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed) ) ) ;
1207
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1208
1192
} ,
1209
1193
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
1210
1194
log_trace ! ( self . logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
1211
1195
log_pubkey!( node_id) ,
1212
1196
log_bytes!( msg. channel_id) ) ;
1213
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1197
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1214
1198
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1215
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1216
1199
} ,
1217
1200
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
1218
1201
log_trace ! ( self . logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
1219
1202
log_pubkey!( node_id) ,
1220
1203
log_bytes!( msg. channel_id) ) ;
1221
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1204
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1222
1205
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1223
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1224
1206
} ,
1225
1207
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
1226
1208
log_trace ! ( self . logger, "Handling Shutdown event in peer_handler for node {} for channel {}" ,
1227
1209
log_pubkey!( node_id) ,
1228
1210
log_bytes!( msg. channel_id) ) ;
1229
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1211
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1230
1212
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1231
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1232
1213
} ,
1233
1214
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
1234
1215
log_trace ! ( self . logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
1235
1216
log_pubkey!( node_id) ,
1236
1217
log_bytes!( msg. channel_id) ) ;
1237
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1218
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1238
1219
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1239
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1240
1220
} ,
1241
1221
MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
1242
1222
log_trace ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -1264,7 +1244,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1264
1244
match * action {
1265
1245
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1266
1246
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
1267
- peers. peers_needing_send . remove ( & descriptor) ;
1268
1247
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
1269
1248
if let Some ( ref msg) = * msg {
1270
1249
log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -1287,21 +1266,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1287
1266
log_trace ! ( self . logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}" ,
1288
1267
log_pubkey!( node_id) ,
1289
1268
msg. data) ;
1290
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1269
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1291
1270
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1292
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1293
1271
} ,
1294
1272
}
1295
1273
} ,
1296
1274
MessageSendEvent :: SendChannelRangeQuery { ref node_id, ref msg } => {
1297
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1275
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1298
1276
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1299
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1300
1277
} ,
1301
1278
MessageSendEvent :: SendShortIdsQuery { ref node_id, ref msg } => {
1302
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1279
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1303
1280
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1304
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1305
1281
}
1306
1282
MessageSendEvent :: SendReplyChannelRange { ref node_id, ref msg } => {
1307
1283
log_trace ! ( self . logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}" ,
@@ -1310,18 +1286,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1310
1286
msg. first_blocknum,
1311
1287
msg. number_of_blocks,
1312
1288
msg. sync_complete) ;
1313
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1289
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1314
1290
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1315
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1316
1291
}
1317
1292
}
1318
1293
}
1319
1294
1320
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1321
- match peers. peers . get_mut ( & descriptor) {
1322
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
1323
- None => panic ! ( "Inconsistent peers set state!" ) ,
1324
- }
1295
+ for ( descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
1296
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1325
1297
}
1326
1298
}
1327
1299
}
@@ -1340,7 +1312,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1340
1312
1341
1313
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1342
1314
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
1343
- peers. peers_needing_send . remove ( descriptor) ;
1344
1315
let peer_option = peers. peers . remove ( descriptor) ;
1345
1316
match peer_option {
1346
1317
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
@@ -1368,7 +1339,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1368
1339
if let Some ( mut descriptor) = peers_lock. node_id_to_descriptor . remove ( & node_id) {
1369
1340
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request" , node_id) ;
1370
1341
peers_lock. peers . remove ( & descriptor) ;
1371
- peers_lock. peers_needing_send . remove ( & descriptor) ;
1372
1342
self . message_handler . chan_handler . peer_disconnected ( & node_id, no_connection_possible) ;
1373
1343
descriptor. disconnect_socket ( ) ;
1374
1344
}
@@ -1382,14 +1352,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1382
1352
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1383
1353
{
1384
1354
let peers = & mut * peers_lock;
1385
- let peers_needing_send = & mut peers. peers_needing_send ;
1386
1355
let node_id_to_descriptor = & mut peers. node_id_to_descriptor ;
1387
1356
let peers = & mut peers. peers ;
1388
1357
let mut descriptors_needing_disconnect = Vec :: new ( ) ;
1389
1358
1390
1359
peers. retain ( |descriptor, peer| {
1391
1360
if peer. awaiting_pong {
1392
- peers_needing_send. remove ( descriptor) ;
1393
1361
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1394
1362
match peer. their_node_id {
1395
1363
Some ( node_id) => {
0 commit comments