Skip to content

Commit 2d0e53e

Browse files
committed
Remove peers_needing_send set
Motivated by #456, remove the peers_needing_send set in favor of just scanning the peers during process_events() and attempting to send data for those peers that have items in their outbound queue or pending sync items to be sent out.
1 parent cbfe9c3 commit 2d0e53e

File tree

1 file changed

+33
-43
lines changed

1 file changed

+33
-43
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
2020
use util::logger::Logger;
2121
use routing::network_graph::NetGraphMsgHandler;
2222

23-
use std::collections::{HashMap,HashSet};
23+
use std::collections::HashMap;
2424
use std::sync::{Arc, Mutex};
2525
use std::sync::atomic::{AtomicUsize, Ordering};
2626
use std::{cmp,error,hash,fmt};
@@ -237,9 +237,6 @@ impl<TransportImpl: ITransport> Peer<TransportImpl> {
237237

238238
struct PeerHolder<Descriptor: SocketDescriptor, TransportImpl: ITransport> {
239239
peers: HashMap<Descriptor, Peer<TransportImpl>>,
240-
/// Added to by do_read_event for cases where we pushed a message onto the send buffer but
241-
/// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
242-
peers_needing_send: HashSet<Descriptor>,
243240
/// Peers in this map have completed the NOISE handshake and received an Init message
244241
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
245242
}
@@ -281,9 +278,6 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
281278

282279
// Removes all associated metadata for descriptor and returns the Peer object associated with it
283280
fn remove_peer_by_descriptor(&mut self, descriptor: &Descriptor) -> Peer<TransportImpl> {
284-
// may or may not be in this set depending on in-flight messages
285-
self.peers_needing_send.remove(descriptor);
286-
287281
let peer_option = self.peers.remove(descriptor);
288282
match peer_option {
289283
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
@@ -300,6 +294,23 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
300294
}
301295
}
302296
}
297+
298+
// Returns the collection of peers that have data to send. Could be due to items in their outbound
299+
// queue or sync messages that need to be sent out.
300+
fn peers_needing_send<'a>(&'a mut self) -> Filter<IterMut<'a, Descriptor, Peer<TransportImpl>>, fn(&(&'a Descriptor, &'a mut Peer<TransportImpl>)) -> bool> {
301+
self.peers.iter_mut().filter(|(_, peer)| {
302+
let has_outbound_sync = match &peer.post_init_state {
303+
None => false,
304+
Some(post_init_state) => match &post_init_state.sync_status {
305+
InitSyncTracker::NoSyncRequested => false,
306+
InitSyncTracker::ChannelsSyncing(_) => true,
307+
InitSyncTracker::NodesSyncing(_) => true,
308+
}
309+
};
310+
311+
has_outbound_sync || !peer.outbound_queue.is_empty()
312+
})
313+
}
303314
}
304315

305316
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
@@ -499,7 +510,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
499510
message_handler,
500511
peers: Mutex::new(PeerHolder {
501512
peers: HashMap::new(),
502-
peers_needing_send: HashSet::new(),
503513
node_id_to_descriptor: HashMap::new()
504514
}),
505515
our_node_secret,
@@ -654,7 +664,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
654664
None => panic!("Descriptor for read_event is not already known to PeerManager"),
655665
Some(peer) => peer
656666
};
657-
self.do_read_event(peer_descriptor, peer, &mut peers.peers_needing_send, &mut peers.node_id_to_descriptor, data)
667+
self.do_read_event(peer_descriptor, peer, &mut peers.node_id_to_descriptor, data)
658668
};
659669

660670
match result {
@@ -666,12 +676,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
666676
}
667677
}
668678

669-
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
670-
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, message_queuer: &mut impl MessageQueuer, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
671-
message_queuer.enqueue_message(message, output_buffer, &*self.logger);
672-
peers_needing_send.insert(descriptor.clone());
673-
}
674-
675679
// Returns a valid PostInitState given a Init message
676680
fn post_init_state_from_init_message(&self, init_message: &msgs::Init, their_node_id: &PublicKey) -> Result<PostInitState, PeerHandleError> {
677681
if init_message.features.requires_unknown_bits() {
@@ -707,18 +711,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
707711
}
708712

709713
// Add an Init message to the outbound queue
710-
fn enqueue_init_message(&self, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>) {
714+
fn enqueue_init_message(&self, peer: &mut Peer<TransportImpl>) {
711715
let mut features = InitFeatures::known();
712716
if !self.message_handler.route_handler.should_request_full_sync(&peer.transport.get_their_node_id()) {
713717
features.clear_initial_routing_sync();
714718
}
715719

716720
let resp = msgs::Init { features };
717-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, descriptor, &resp);
721+
peer.transport.enqueue_message(&resp, &mut peer.outbound_queue, &*self.logger);
718722
}
719723

720724
// Process an incoming Init message and set Peer and PeerManager state accordingly
721-
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>) -> Result<(), PeerHandleError> {
725+
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>) -> Result<(), PeerHandleError> {
722726
let their_node_id = peer.transport.get_their_node_id();
723727

724728
match message {
@@ -731,13 +735,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
731735

732736
let new_post_init_state = self.post_init_state_from_init_message(init_message, &their_node_id)?;
733737

734-
if let InitSyncTracker::ChannelsSyncing(_) = new_post_init_state.sync_status {
735-
peers_needing_send.insert(descriptor.clone());
736-
}
737-
738738
if !peer.outbound {
739-
self.enqueue_init_message(descriptor, peer, peers_needing_send);
739+
self.enqueue_init_message(peer);
740740
}
741+
741742
node_id_to_descriptor.insert(their_node_id.clone(), descriptor.clone());
742743
self.message_handler.chan_handler.peer_connected(&their_node_id, init_message);
743744

@@ -753,7 +754,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
753754
Ok(())
754755
}
755756

756-
fn do_read_event(&self, peer_descriptor: &mut Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, data: &[u8]) -> Result<bool, PeerHandleError> {
757+
fn do_read_event(&self, peer_descriptor: &mut Descriptor, peer: &mut Peer<TransportImpl>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, data: &[u8]) -> Result<bool, PeerHandleError> {
757758

758759
match peer.transport.process_input(data, &mut peer.outbound_queue) {
759760
Err(e) => {
@@ -766,13 +767,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
766767
}
767768

768769
if newly_connected && peer.outbound {
769-
self.enqueue_init_message(peer_descriptor, peer, peers_needing_send);
770-
}
771-
772-
// If the transport layer placed items in the outbound queue, we need
773-
// to schedule ourselves for flush during the next process_events()
774-
if !peer.outbound_queue.is_empty() {
775-
peers_needing_send.insert(peer_descriptor.clone());
770+
self.enqueue_init_message(peer);
776771
}
777772
}
778773
}
@@ -781,7 +776,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
781776

782777
if peer.transport.is_connected() && peer.post_init_state.is_none() && received_messages.len() > 0 {
783778
let init_message = received_messages.remove(0);
784-
self.process_init_message(init_message, peer_descriptor, peer, peers_needing_send, node_id_to_descriptor)?;
779+
self.process_init_message(init_message, peer_descriptor, peer, node_id_to_descriptor)?;
785780
}
786781

787782
for message in received_messages {
@@ -802,7 +797,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
802797
},
803798
msgs::ErrorAction::SendErrorMessage { msg } => {
804799
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
805-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, peer_descriptor, &msg);
800+
peer.transport.enqueue_message(&msg, &mut peer.outbound_queue, &*self.logger);
806801
continue;
807802
},
808803
}
@@ -811,7 +806,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
811806
}
812807
}
813808

814-
if let Err(handling_error) = self.handle_message(message, peer_descriptor, peer, peers_needing_send) {
809+
if let Err(handling_error) = self.handle_message(message, peer) {
815810
match handling_error {
816811
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
817812
MessageHandlingError::LightningError(e) => {
@@ -827,9 +822,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
827822
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
828823
fn handle_message(&self,
829824
message: wire::Message,
830-
peer_descriptor: &mut Descriptor,
831-
peer: &mut Peer<TransportImpl>,
832-
peers_needing_send: &mut HashSet<Descriptor>) -> Result<(), MessageHandlingError> {
825+
peer: &mut Peer<TransportImpl>) -> Result<(), MessageHandlingError> {
833826

834827
let their_node_id = peer.transport.get_their_node_id();
835828
let post_init_state = peer.post_init_state.as_mut().unwrap();
@@ -864,7 +857,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
864857
wire::Message::Ping(msg) => {
865858
if msg.ponglen < 65532 {
866859
let resp = msgs::Pong { byteslen: msg.ponglen };
867-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, &peer_descriptor, &resp);
860+
peer.transport.enqueue_message(&resp, &mut peer.outbound_queue, &*self.logger);
868861
}
869862
},
870863
wire::Message::Pong(_msg) => {
@@ -1242,11 +1235,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
12421235
}
12431236
}
12441237

1245-
for mut descriptor in peers.peers_needing_send.drain() {
1246-
match peers.peers.get_mut(&descriptor) {
1247-
Some(peer) => self.do_attempt_write_data(&mut descriptor, &mut peer.post_init_state, &mut peer.transport, &mut peer.outbound_queue),
1248-
None => panic!("Inconsistent peers set state!"),
1249-
}
1238+
for (descriptor, peer) in peers.peers_needing_send() {
1239+
self.do_attempt_write_data(&mut descriptor.clone(), &mut peer.post_init_state, &mut peer.transport, &mut peer.outbound_queue);
12501240
}
12511241
}
12521242
}

0 commit comments

Comments
 (0)