Skip to content

Commit 9de22ae

Browse files
committed
Refactor message broadcasting out into a utility method
This will allow us to broadcast messages received in the next commit.
1 parent a6463ec commit 9de22ae

File tree

1 file changed

+68
-46
lines changed

1 file changed

+68
-46
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 68 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,64 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10091009
Ok(())
10101010
}
10111011

1012+
fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message, except_node: Option<&PublicKey>) {
1013+
match msg {
1014+
wire::Message::ChannelAnnouncement(ref msg) => {
1015+
let encoded_msg = encode_msg!(msg);
1016+
1017+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1018+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1019+
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1020+
continue
1021+
}
1022+
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
1023+
peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) {
1024+
continue;
1025+
}
1026+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1027+
continue;
1028+
}
1029+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1030+
peers.peers_needing_send.insert((*descriptor).clone());
1031+
}
1032+
},
1033+
wire::Message::NodeAnnouncement(ref msg) => {
1034+
let encoded_msg = encode_msg!(msg);
1035+
1036+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1037+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1038+
!peer.should_forward_node_announcement(msg.contents.node_id) {
1039+
continue
1040+
}
1041+
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
1042+
continue;
1043+
}
1044+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1045+
continue;
1046+
}
1047+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1048+
peers.peers_needing_send.insert((*descriptor).clone());
1049+
}
1050+
},
1051+
wire::Message::ChannelUpdate(ref msg) => {
1052+
let encoded_msg = encode_msg!(msg);
1053+
1054+
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1055+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1056+
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1057+
continue
1058+
}
1059+
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
1060+
continue;
1061+
}
1062+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1063+
peers.peers_needing_send.insert((*descriptor).clone());
1064+
}
1065+
},
1066+
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
1067+
}
1068+
}
1069+
10121070
/// Checks for any events generated by our handlers and processes them. Includes sending most
10131071
/// response messages as well as messages generated by calls to handler functions directly (eg
10141072
/// functions like ChannelManager::process_pending_htlc_forward or send_payment).
@@ -1154,59 +1212,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11541212
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
11551213
self.do_attempt_write_data(&mut descriptor, peer);
11561214
},
1157-
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1215+
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
11581216
log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
1159-
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
1160-
let encoded_msg = encode_msg!(msg);
1161-
let encoded_update_msg = encode_msg!(update_msg);
1162-
1163-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1164-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1165-
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1166-
continue
1167-
}
1168-
match peer.their_node_id {
1169-
None => continue,
1170-
Some(their_node_id) => {
1171-
if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
1172-
continue
1173-
}
1174-
}
1175-
}
1176-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1177-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
1178-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1179-
}
1217+
if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
1218+
self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
1219+
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
11801220
}
11811221
},
1182-
MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
1222+
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
11831223
log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
1184-
if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
1185-
let encoded_msg = encode_msg!(msg);
1186-
1187-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1188-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1189-
!peer.should_forward_node_announcement(msg.contents.node_id) {
1190-
continue
1191-
}
1192-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1193-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1194-
}
1224+
if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
1225+
self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
11951226
}
11961227
},
1197-
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
1228+
MessageSendEvent::BroadcastChannelUpdate { msg } => {
11981229
log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
1199-
if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
1200-
let encoded_msg = encode_msg!(msg);
1201-
1202-
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
1203-
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
1204-
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
1205-
continue
1206-
}
1207-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1208-
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
1209-
}
1230+
if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
1231+
self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
12101232
}
12111233
},
12121234
MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {

0 commit comments

Comments
 (0)