Skip to content

Remove peers from the node_id_to_descriptor even without init #2060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 178 additions & 68 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,34 +815,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes

let mut peers = self.peers.write().unwrap();
if peers.insert(descriptor, Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
their_net_address: remote_network_address,

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,

msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,

received_channel_announce_since_backlogged: false,
inbound_connection: false,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(res)
match peers.entry(descriptor) {
hash_map::Entry::Occupied(_) => {
debug_assert!(false, "PeerManager driver duplicated descriptors!");
Err(PeerHandleError {})
},
hash_map::Entry::Vacant(e) => {
e.insert(Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
their_net_address: remote_network_address,

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,

msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,

received_channel_announce_since_backlogged: false,
inbound_connection: false,
}));
Ok(res)
}
}
}

/// Indicates a new inbound connection has been established to a node with an optional remote
Expand All @@ -865,34 +871,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes

let mut peers = self.peers.write().unwrap();
if peers.insert(descriptor, Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
their_net_address: remote_network_address,

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,

msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,

received_channel_announce_since_backlogged: false,
inbound_connection: true,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
match peers.entry(descriptor) {
hash_map::Entry::Occupied(_) => {
debug_assert!(false, "PeerManager driver duplicated descriptors!");
Err(PeerHandleError {})
},
hash_map::Entry::Vacant(e) => {
e.insert(Mutex::new(Peer {
channel_encryptor: peer_encryptor,
their_node_id: None,
their_features: None,
their_net_address: remote_network_address,

pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,

pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,

sync_status: InitSyncTracker::NoSyncRequested,

msgs_sent_since_pong: 0,
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,

received_channel_announce_since_backlogged: false,
inbound_connection: true,
}));
Ok(())
}
}
}

fn peer_should_read(&self, peer: &mut Peer) -> bool {
Expand Down Expand Up @@ -1141,9 +1153,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
macro_rules! insert_node_id {
() => {
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
hash_map::Entry::Occupied(_) => {
hash_map::Entry::Occupied(e) => {
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
// Check that the peers map is consistent with the
// node_id_to_descriptor map, as this has been broken
// before.
debug_assert!(peers.get(e.get()).is_some());
return Err(PeerHandleError { })
},
hash_map::Entry::Vacant(entry) => {
Expand Down Expand Up @@ -1913,7 +1929,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
}
self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
}
} else { debug_assert!(false, "Missing connection for peer"); }
}
}
}
Expand Down Expand Up @@ -1951,11 +1967,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
},
Some(peer_lock) => {
let peer = peer_lock.lock().unwrap();
if !peer.handshake_complete() { return; }
debug_assert!(peer.their_node_id.is_some());
if let Some((node_id, _)) = peer.their_node_id {
log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id));
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
debug_assert!(removed.is_some(), "descriptor maps should be consistent");
if !peer.handshake_complete() { return; }
self.message_handler.chan_handler.peer_disconnected(&node_id);
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
}
Expand Down Expand Up @@ -2188,12 +2204,13 @@ mod tests {

use crate::prelude::*;
use crate::sync::{Arc, Mutex};
use core::sync::atomic::Ordering;
use core::sync::atomic::{AtomicBool, Ordering};

#[derive(Clone)]
struct FileDescriptor {
fd: u16,
outbound_data: Arc<Mutex<Vec<u8>>>,
disconnect: Arc<AtomicBool>,
}
impl PartialEq for FileDescriptor {
fn eq(&self, other: &Self) -> bool {
Expand All @@ -2213,7 +2230,7 @@ mod tests {
data.len()
}

fn disconnect_socket(&mut self) {}
fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); }
}

struct PeerManagerCfg {
Expand Down Expand Up @@ -2254,10 +2271,16 @@ mod tests {

fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let mut fd_a = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let mut fd_b = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
Expand All @@ -2281,6 +2304,84 @@ mod tests {
(fd_a.clone(), fd_b.clone())
}

#[test]
#[cfg(feature = "std")]
fn fuzz_threaded_connections() {
// Spawn two threads which repeatedly connect two peers together, leading to "got second
// connection with peer" disconnections and rapid reconnect. This previously found an issue
// with our internal map consistency, and is a generally good smoke test of disconnection.
let cfgs = Arc::new(create_peermgr_cfgs(2));
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));

let start_time = std::time::Instant::now();
macro_rules! spawn_thread { ($id: expr) => { {
let peers = Arc::clone(&peers);
let cfgs = Arc::clone(&cfgs);
std::thread::spawn(move || {
let mut ctr = 0;
while start_time.elapsed() < std::time::Duration::from_secs(1) {
let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
let mut fd_a = FileDescriptor {
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
let mut fd_b = FileDescriptor {
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);

while start_time.elapsed() < std::time::Duration::from_secs(1) {
peers[0].process_events();
if fd_a.disconnect.load(Ordering::Acquire) { break; }
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; }

peers[1].process_events();
if fd_b.disconnect.load(Ordering::Acquire) { break; }
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; }

cfgs[0].chan_handler.pending_events.lock().unwrap()
.push(crate::util::events::MessageSendEvent::SendShutdown {
node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(),
msg: msgs::Shutdown {
channel_id: [0; 32],
scriptpubkey: bitcoin::Script::new(),
},
});
cfgs[1].chan_handler.pending_events.lock().unwrap()
.push(crate::util::events::MessageSendEvent::SendShutdown {
node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(),
msg: msgs::Shutdown {
channel_id: [0; 32],
scriptpubkey: bitcoin::Script::new(),
},
});

peers[0].timer_tick_occurred();
peers[1].timer_tick_occurred();
}

peers[0].socket_disconnected(&fd_a);
peers[1].socket_disconnected(&fd_b);
ctr += 1;
std::thread::sleep(std::time::Duration::from_micros(1));
}
})
} } }
let thrd_a = spawn_thread!(1);
let thrd_b = spawn_thread!(2);

thrd_a.join().unwrap();
thrd_b.join().unwrap();
}

#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
Expand Down Expand Up @@ -2337,7 +2438,10 @@ mod tests {
let cfgs = create_peermgr_cfgs(2);
let peers = create_network(2, &cfgs);

let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let mut fd_dup = FileDescriptor {
fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003};
let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
Expand Down Expand Up @@ -2441,8 +2545,14 @@ mod tests {
let peers = create_network(2, &cfgs);

let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
let mut fd_a = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let mut fd_b = FileDescriptor {
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
disconnect: Arc::new(AtomicBool::new(false)),
};
let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();

Expand Down