@@ -32,7 +32,7 @@ use routing::network_graph::NetGraphMsgHandler;
32
32
33
33
use prelude:: * ;
34
34
use alloc:: collections:: LinkedList ;
35
- use std:: collections:: { HashMap , hash_map, HashSet } ;
35
+ use std:: collections:: { HashMap , hash_map} ;
36
36
use std:: sync:: { Arc , Mutex } ;
37
37
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
38
38
use core:: { cmp, hash, fmt, mem} ;
@@ -286,9 +286,6 @@ impl Peer {
286
286
287
287
struct PeerHolder < Descriptor : SocketDescriptor > {
288
288
peers : HashMap < Descriptor , Peer > ,
289
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
290
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
291
- peers_needing_send : HashSet < Descriptor > ,
292
289
/// Only add to this set when noise completes:
293
290
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
294
291
}
@@ -419,7 +416,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
419
416
message_handler,
420
417
peers : Mutex :: new ( PeerHolder {
421
418
peers : HashMap :: new ( ) ,
422
- peers_needing_send : HashSet :: new ( ) ,
423
419
node_id_to_descriptor : HashMap :: new ( )
424
420
} ) ,
425
421
our_node_secret,
@@ -651,14 +647,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
651
647
}
652
648
653
649
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
654
- fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , descriptor : Descriptor , message : & M ) {
650
+ fn enqueue_message < M : Encode + Writeable > ( & self , peer : & mut Peer , message : & M ) {
655
651
let mut buffer = VecWriter ( Vec :: new ( ) ) ;
656
652
wire:: write ( message, & mut buffer) . unwrap ( ) ; // crash if the write failed
657
653
let encoded_message = buffer. 0 ;
658
654
659
655
log_trace ! ( self . logger, "Enqueueing message of type {} to {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
660
656
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
661
- peers_needing_send. insert ( descriptor) ;
662
657
}
663
658
664
659
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
@@ -702,7 +697,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
702
697
} ,
703
698
msgs:: ErrorAction :: SendErrorMessage { msg } => {
704
699
log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
705
- self . enqueue_message( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & msg) ;
700
+ self . enqueue_message( peer, & msg) ;
706
701
continue ;
707
702
} ,
708
703
}
@@ -744,7 +739,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
744
739
insert_node_id ! ( ) ;
745
740
let features = InitFeatures :: known ( ) ;
746
741
let resp = msgs:: Init { features } ;
747
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
742
+ self . enqueue_message ( peer, & resp) ;
748
743
} ,
749
744
NextNoiseStep :: ActThree => {
750
745
let their_node_id = try_potential_handleerror ! ( peer. channel_encryptor. process_act_three( & peer. pending_read_buffer[ ..] ) ) ;
@@ -754,7 +749,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
754
749
insert_node_id ! ( ) ;
755
750
let features = InitFeatures :: known ( ) ;
756
751
let resp = msgs:: Init { features } ;
757
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
752
+ self . enqueue_message ( peer, & resp) ;
758
753
} ,
759
754
NextNoiseStep :: NoiseComplete => {
760
755
if peer. pending_read_is_header {
@@ -802,7 +797,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
802
797
}
803
798
} ;
804
799
805
- match self . handle_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , message) {
800
+ match self . handle_message ( peer, message) {
806
801
Err ( handling_error) => match handling_error {
807
802
MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
808
803
MessageHandlingError :: LightningError ( e) => {
@@ -837,7 +832,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
837
832
838
833
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
839
834
/// Returns the message back if it needs to be broadcasted to all other peers.
840
- fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
835
+ fn handle_message ( & self , peer : & mut Peer , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
841
836
log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
842
837
843
838
// Need an Init as first message
@@ -872,7 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
872
867
873
868
if msg. features . initial_routing_sync ( ) {
874
869
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
875
- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
876
870
}
877
871
if !msg. features . supports_static_remote_key ( ) {
878
872
log_debug ! ( self . logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
@@ -907,7 +901,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
907
901
wire:: Message :: Ping ( msg) => {
908
902
if msg. ponglen < 65532 {
909
903
let resp = msgs:: Pong { byteslen : msg. ponglen } ;
910
- self . enqueue_message ( peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
904
+ self . enqueue_message ( peer, & resp) ;
911
905
}
912
906
} ,
913
907
wire:: Message :: Pong ( _msg) => {
@@ -1029,7 +1023,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1029
1023
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1030
1024
let encoded_msg = encode_msg ! ( msg) ;
1031
1025
1032
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1026
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1033
1027
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1034
1028
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1035
1029
continue
@@ -1045,13 +1039,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1045
1039
continue ;
1046
1040
}
1047
1041
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1048
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1049
1042
}
1050
1043
} ,
1051
1044
wire:: Message :: NodeAnnouncement ( ref msg) => {
1052
1045
let encoded_msg = encode_msg ! ( msg) ;
1053
1046
1054
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1047
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1055
1048
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1056
1049
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1057
1050
continue
@@ -1066,13 +1059,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1066
1059
continue ;
1067
1060
}
1068
1061
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1069
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1070
1062
}
1071
1063
} ,
1072
1064
wire:: Message :: ChannelUpdate ( ref msg) => {
1073
1065
let encoded_msg = encode_msg ! ( msg) ;
1074
1066
1075
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1067
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1076
1068
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1077
1069
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1078
1070
continue
@@ -1084,7 +1076,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1084
1076
continue ;
1085
1077
}
1086
1078
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1087
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1088
1079
}
1089
1080
} ,
1090
1081
_ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
@@ -1131,17 +1122,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1131
1122
log_trace ! ( self . logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}" ,
1132
1123
log_pubkey!( node_id) ,
1133
1124
log_bytes!( msg. temporary_channel_id) ) ;
1134
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1125
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1135
1126
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1136
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1137
1127
} ,
1138
1128
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
1139
1129
log_trace ! ( self . logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
1140
1130
log_pubkey!( node_id) ,
1141
1131
log_bytes!( msg. temporary_channel_id) ) ;
1142
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1132
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1143
1133
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1144
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1145
1134
} ,
1146
1135
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
1147
1136
log_trace ! ( self . logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -1150,35 +1139,31 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1150
1139
log_funding_channel_id!( msg. funding_txid, msg. funding_output_index) ) ;
1151
1140
// TODO: If the peer is gone we should generate a DiscardFunding event
1152
1141
// indicating to the wallet that they should just throw away this funding transaction
1153
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1142
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1154
1143
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1155
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1156
1144
} ,
1157
1145
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
1158
1146
log_trace ! ( self . logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
1159
1147
log_pubkey!( node_id) ,
1160
1148
log_bytes!( msg. channel_id) ) ;
1161
1149
// TODO: If the peer is gone we should generate a DiscardFunding event
1162
1150
// indicating to the wallet that they should just throw away this funding transaction
1163
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1151
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1164
1152
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1165
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1166
1153
} ,
1167
1154
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
1168
1155
log_trace ! ( self . logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
1169
1156
log_pubkey!( node_id) ,
1170
1157
log_bytes!( msg. channel_id) ) ;
1171
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1158
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1172
1159
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1173
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1174
1160
} ,
1175
1161
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
1176
1162
log_trace ! ( self . logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
1177
1163
log_pubkey!( node_id) ,
1178
1164
log_bytes!( msg. channel_id) ) ;
1179
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1165
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1180
1166
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1181
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1182
1167
} ,
1183
1168
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
1184
1169
log_trace ! ( self . logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -1187,7 +1172,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1187
1172
update_fulfill_htlcs. len( ) ,
1188
1173
update_fail_htlcs. len( ) ,
1189
1174
log_bytes!( commitment_signed. channel_id) ) ;
1190
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1175
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1191
1176
for msg in update_add_htlcs {
1192
1177
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1193
1178
}
@@ -1204,39 +1189,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1204
1189
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1205
1190
}
1206
1191
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed) ) ) ;
1207
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1208
1192
} ,
1209
1193
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
1210
1194
log_trace ! ( self . logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
1211
1195
log_pubkey!( node_id) ,
1212
1196
log_bytes!( msg. channel_id) ) ;
1213
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1197
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1214
1198
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1215
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1216
1199
} ,
1217
1200
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
1218
1201
log_trace ! ( self . logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
1219
1202
log_pubkey!( node_id) ,
1220
1203
log_bytes!( msg. channel_id) ) ;
1221
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1204
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1222
1205
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1223
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1224
1206
} ,
1225
1207
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
1226
1208
log_trace ! ( self . logger, "Handling Shutdown event in peer_handler for node {} for channel {}" ,
1227
1209
log_pubkey!( node_id) ,
1228
1210
log_bytes!( msg. channel_id) ) ;
1229
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1211
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1230
1212
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1231
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1232
1213
} ,
1233
1214
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
1234
1215
log_trace ! ( self . logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
1235
1216
log_pubkey!( node_id) ,
1236
1217
log_bytes!( msg. channel_id) ) ;
1237
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1218
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1238
1219
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1239
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1240
1220
} ,
1241
1221
MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
1242
1222
log_trace ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -1264,7 +1244,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1264
1244
match * action {
1265
1245
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1266
1246
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
1267
- peers. peers_needing_send . remove ( & descriptor) ;
1268
1247
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
1269
1248
if let Some ( ref msg) = * msg {
1270
1249
log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -1287,21 +1266,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1287
1266
log_trace ! ( self . logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}" ,
1288
1267
log_pubkey!( node_id) ,
1289
1268
msg. data) ;
1290
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1269
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1291
1270
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1292
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1293
1271
} ,
1294
1272
}
1295
1273
} ,
1296
1274
MessageSendEvent :: SendChannelRangeQuery { ref node_id, ref msg } => {
1297
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1275
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1298
1276
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1299
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1300
1277
} ,
1301
1278
MessageSendEvent :: SendShortIdsQuery { ref node_id, ref msg } => {
1302
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1279
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1303
1280
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1304
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1305
1281
}
1306
1282
MessageSendEvent :: SendReplyChannelRange { ref node_id, ref msg } => {
1307
1283
log_trace ! ( self . logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}" ,
@@ -1310,18 +1286,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1310
1286
msg. first_blocknum,
1311
1287
msg. number_of_blocks,
1312
1288
msg. sync_complete) ;
1313
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1289
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1314
1290
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1315
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1316
1291
}
1317
1292
}
1318
1293
}
1319
1294
1320
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1321
- match peers. peers . get_mut ( & descriptor) {
1322
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
1323
- None => panic ! ( "Inconsistent peers set state!" ) ,
1324
- }
1295
+ for ( descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
1296
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1325
1297
}
1326
1298
}
1327
1299
}
@@ -1340,7 +1312,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1340
1312
1341
1313
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1342
1314
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
1343
- peers. peers_needing_send . remove ( descriptor) ;
1344
1315
let peer_option = peers. peers . remove ( descriptor) ;
1345
1316
match peer_option {
1346
1317
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
@@ -1368,7 +1339,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1368
1339
if let Some ( mut descriptor) = peers_lock. node_id_to_descriptor . remove ( & node_id) {
1369
1340
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request" , node_id) ;
1370
1341
peers_lock. peers . remove ( & descriptor) ;
1371
- peers_lock. peers_needing_send . remove ( & descriptor) ;
1372
1342
self . message_handler . chan_handler . peer_disconnected ( & node_id, no_connection_possible) ;
1373
1343
descriptor. disconnect_socket ( ) ;
1374
1344
}
@@ -1382,14 +1352,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1382
1352
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1383
1353
{
1384
1354
let peers = & mut * peers_lock;
1385
- let peers_needing_send = & mut peers. peers_needing_send ;
1386
1355
let node_id_to_descriptor = & mut peers. node_id_to_descriptor ;
1387
1356
let peers = & mut peers. peers ;
1388
1357
let mut descriptors_needing_disconnect = Vec :: new ( ) ;
1389
1358
1390
1359
peers. retain ( |descriptor, peer| {
1391
1360
if peer. awaiting_pong {
1392
- peers_needing_send. remove ( descriptor) ;
1393
1361
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1394
1362
match peer. their_node_id {
1395
1363
Some ( node_id) => {
0 commit comments