Skip to content

Commit d13f336

Browse files
committed
Use a MessageSendEvent-handling fn rather than a single lopp
Rather than building a single `Vec` of `MessageSendEvent`s to handle then iterating over them, we move the body of the loop into a lambda and run the loop twice. In some cases, this may save a single allocation, but more importantly it sets us up for the next commit, which needs to know from which handler the `MessageSendEvent` it is processing came from.
1 parent 4147de2 commit d13f336

File tree

1 file changed

+65
-56
lines changed

1 file changed

+65
-56
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,64 +2064,66 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20642064
{
20652065
let peers_lock = self.peers.read().unwrap();
20662066

2067-
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
2068-
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
2067+
let chan_events = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
2068+
let route_events = self.message_handler.route_handler.get_and_clear_pending_msg_events();
20692069

20702070
let peers = &*peers_lock;
20712071
macro_rules! get_peer_for_forwarding {
20722072
($node_id: expr) => {
20732073
{
20742074
if peers_to_disconnect.get($node_id).is_some() {
20752075
// If we've "disconnected" this peer, do not send to it.
2076-
continue;
2077-
}
2078-
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2079-
match descriptor_opt {
2080-
Some(descriptor) => match peers.get(&descriptor) {
2081-
Some(peer_mutex) => {
2082-
let peer_lock = peer_mutex.lock().unwrap();
2083-
if !peer_lock.handshake_complete() {
2084-
continue;
2076+
None
2077+
} else {
2078+
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2079+
match descriptor_opt {
2080+
Some(descriptor) => match peers.get(&descriptor) {
2081+
Some(peer_mutex) => {
2082+
let peer_lock = peer_mutex.lock().unwrap();
2083+
if !peer_lock.handshake_complete() {
2084+
None
2085+
} else {
2086+
Some(peer_lock)
2087+
}
2088+
},
2089+
None => {
2090+
debug_assert!(false, "Inconsistent peers set state!");
2091+
None
20852092
}
2086-
peer_lock
20872093
},
20882094
None => {
2089-
debug_assert!(false, "Inconsistent peers set state!");
2090-
continue;
2091-
}
2092-
},
2093-
None => {
2094-
continue;
2095-
},
2095+
None
2096+
},
2097+
}
20962098
}
20972099
}
20982100
}
20992101
}
2100-
for event in events_generated.drain(..) {
2102+
let mut handle_event = |event, from_chan_handler| {
21012103
match event {
21022104
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
21032105
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
21042106
log_pubkey!(node_id),
21052107
&msg.common_fields.temporary_channel_id);
2106-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2108+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21072109
},
21082110
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
21092111
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
21102112
log_pubkey!(node_id),
21112113
&msg.common_fields.temporary_channel_id);
2112-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2114+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21132115
},
21142116
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
21152117
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
21162118
log_pubkey!(node_id),
21172119
&msg.common_fields.temporary_channel_id);
2118-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2120+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21192121
},
21202122
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
21212123
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
21222124
log_pubkey!(node_id),
21232125
&msg.common_fields.temporary_channel_id);
2124-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2126+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21252127
},
21262128
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
21272129
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@@ -2130,107 +2132,107 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
21302132
ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
21312133
// TODO: If the peer is gone we should generate a DiscardFunding event
21322134
// indicating to the wallet that they should just throw away this funding transaction
2133-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2135+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21342136
},
21352137
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
21362138
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
21372139
log_pubkey!(node_id),
21382140
&msg.channel_id);
2139-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2141+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21402142
},
21412143
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
21422144
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
21432145
log_pubkey!(node_id),
21442146
&msg.channel_id);
2145-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2147+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21462148
},
21472149
MessageSendEvent::SendStfu { ref node_id, ref msg} => {
21482150
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21492151
log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
21502152
log_pubkey!(node_id),
21512153
&msg.channel_id);
2152-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2154+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21532155
}
21542156
MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
21552157
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21562158
log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
21572159
log_pubkey!(node_id),
21582160
&msg.channel_id);
2159-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2161+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21602162
}
21612163
MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
21622164
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21632165
log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
21642166
log_pubkey!(node_id),
21652167
&msg.channel_id);
2166-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2168+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21672169
}
21682170
MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
21692171
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21702172
log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
21712173
log_pubkey!(node_id),
21722174
&msg.channel_id);
2173-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2175+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21742176
}
21752177
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
21762178
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
21772179
log_pubkey!(node_id),
21782180
&msg.channel_id);
2179-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2181+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21802182
},
21812183
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
21822184
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
21832185
log_pubkey!(node_id),
21842186
&msg.channel_id);
2185-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2187+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21862188
},
21872189
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
21882190
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
21892191
log_pubkey!(node_id),
21902192
&msg.channel_id);
2191-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2193+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21922194
},
21932195
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
21942196
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
21952197
log_pubkey!(node_id),
21962198
&msg.channel_id);
2197-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2199+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21982200
},
21992201
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
22002202
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
22012203
log_pubkey!(node_id),
22022204
&msg.channel_id);
2203-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2205+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22042206
},
22052207
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
22062208
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
22072209
log_pubkey!(node_id),
22082210
&msg.channel_id);
2209-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2211+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22102212
},
22112213
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
22122214
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
22132215
log_pubkey!(node_id),
22142216
&msg.channel_id);
2215-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2217+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22162218
},
22172219
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
22182220
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
22192221
log_pubkey!(node_id),
22202222
&msg.channel_id);
2221-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2223+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22222224
},
22232225
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
22242226
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
22252227
log_pubkey!(node_id),
22262228
&msg.channel_id);
2227-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2229+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22282230
},
22292231
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
22302232
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
22312233
log_pubkey!(node_id),
22322234
&msg.channel_id);
2233-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2235+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22342236
},
22352237
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
22362238
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -2239,7 +2241,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22392241
update_fulfill_htlcs.len(),
22402242
update_fail_htlcs.len(),
22412243
&commitment_signed.channel_id);
2242-
let mut peer = get_peer_for_forwarding!(node_id);
2244+
let mut peer = get_peer_for_forwarding!(node_id)?;
22432245
for msg in update_add_htlcs {
22442246
self.enqueue_message(&mut *peer, msg);
22452247
}
@@ -2261,32 +2263,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22612263
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
22622264
log_pubkey!(node_id),
22632265
&msg.channel_id);
2264-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2266+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22652267
},
22662268
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
22672269
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
22682270
log_pubkey!(node_id),
22692271
&msg.channel_id);
2270-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2272+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22712273
},
22722274
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
22732275
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
22742276
log_pubkey!(node_id),
22752277
&msg.channel_id);
2276-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2278+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22772279
},
22782280
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
22792281
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
22802282
log_pubkey!(node_id),
22812283
&msg.channel_id);
2282-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2284+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22832285
},
22842286
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
22852287
log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
22862288
log_pubkey!(node_id),
22872289
msg.contents.short_channel_id);
2288-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2289-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
2290+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2291+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg);
22902292
},
22912293
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
22922294
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@@ -2322,7 +2324,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23222324
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
23232325
log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
23242326
log_pubkey!(node_id), msg.contents.short_channel_id);
2325-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2327+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23262328
},
23272329
MessageSendEvent::HandleError { node_id, action } => {
23282330
let logger = WithContext::from(&self.logger, Some(node_id), None, None);
@@ -2360,21 +2362,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23602362
log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
23612363
log_pubkey!(node_id),
23622364
msg.data);
2363-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
2365+
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
23642366
},
23652367
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
23662368
log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
23672369
log_pubkey!(node_id),
23682370
msg.data);
2369-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
2371+
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
23702372
},
23712373
}
23722374
},
23732375
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
2374-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2376+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23752377
},
23762378
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
2377-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2379+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23782380
}
23792381
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
23802382
log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
@@ -2383,17 +2385,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23832385
msg.first_blocknum,
23842386
msg.number_of_blocks,
23852387
msg.sync_complete);
2386-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2388+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23872389
}
23882390
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
2389-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2391+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23902392
}
23912393
}
2394+
Some(())
2395+
};
2396+
for event in chan_events {
2397+
handle_event(event, true);
2398+
}
2399+
for event in route_events {
2400+
handle_event(event, false);
23922401
}
23932402

23942403
for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
23952404
if peers_to_disconnect.get(&node_id).is_some() { continue; }
2396-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
2405+
self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg);
23972406
}
23982407

23992408
for (descriptor, peer_mutex) in peers.iter() {

0 commit comments

Comments
 (0)