Skip to content

Commit 5c6e367

Browse files
authored
Merge pull request #2060 from TheBlueMatt/2023-02-peers-disconnect-consistency
Remove peers from the node_id_to_descriptor even without init
2 parents 6ddf69c + 48946d9 commit 5c6e367

File tree

1 file changed

+178
-68
lines changed

1 file changed

+178
-68
lines changed

lightning/src/ln/peer_handler.rs

+178-68
Original file line numberDiff line numberDiff line change
@@ -815,34 +815,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
815815
let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
816816

817817
let mut peers = self.peers.write().unwrap();
818-
if peers.insert(descriptor, Mutex::new(Peer {
819-
channel_encryptor: peer_encryptor,
820-
their_node_id: None,
821-
their_features: None,
822-
their_net_address: remote_network_address,
823-
824-
pending_outbound_buffer: LinkedList::new(),
825-
pending_outbound_buffer_first_msg_offset: 0,
826-
gossip_broadcast_buffer: LinkedList::new(),
827-
awaiting_write_event: false,
828-
829-
pending_read_buffer,
830-
pending_read_buffer_pos: 0,
831-
pending_read_is_header: false,
832-
833-
sync_status: InitSyncTracker::NoSyncRequested,
834-
835-
msgs_sent_since_pong: 0,
836-
awaiting_pong_timer_tick_intervals: 0,
837-
received_message_since_timer_tick: false,
838-
sent_gossip_timestamp_filter: false,
839-
840-
received_channel_announce_since_backlogged: false,
841-
inbound_connection: false,
842-
})).is_some() {
843-
panic!("PeerManager driver duplicated descriptors!");
844-
};
845-
Ok(res)
818+
match peers.entry(descriptor) {
819+
hash_map::Entry::Occupied(_) => {
820+
debug_assert!(false, "PeerManager driver duplicated descriptors!");
821+
Err(PeerHandleError {})
822+
},
823+
hash_map::Entry::Vacant(e) => {
824+
e.insert(Mutex::new(Peer {
825+
channel_encryptor: peer_encryptor,
826+
their_node_id: None,
827+
their_features: None,
828+
their_net_address: remote_network_address,
829+
830+
pending_outbound_buffer: LinkedList::new(),
831+
pending_outbound_buffer_first_msg_offset: 0,
832+
gossip_broadcast_buffer: LinkedList::new(),
833+
awaiting_write_event: false,
834+
835+
pending_read_buffer,
836+
pending_read_buffer_pos: 0,
837+
pending_read_is_header: false,
838+
839+
sync_status: InitSyncTracker::NoSyncRequested,
840+
841+
msgs_sent_since_pong: 0,
842+
awaiting_pong_timer_tick_intervals: 0,
843+
received_message_since_timer_tick: false,
844+
sent_gossip_timestamp_filter: false,
845+
846+
received_channel_announce_since_backlogged: false,
847+
inbound_connection: false,
848+
}));
849+
Ok(res)
850+
}
851+
}
846852
}
847853

848854
/// Indicates a new inbound connection has been established to a node with an optional remote
@@ -865,34 +871,40 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
865871
let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
866872

867873
let mut peers = self.peers.write().unwrap();
868-
if peers.insert(descriptor, Mutex::new(Peer {
869-
channel_encryptor: peer_encryptor,
870-
their_node_id: None,
871-
their_features: None,
872-
their_net_address: remote_network_address,
873-
874-
pending_outbound_buffer: LinkedList::new(),
875-
pending_outbound_buffer_first_msg_offset: 0,
876-
gossip_broadcast_buffer: LinkedList::new(),
877-
awaiting_write_event: false,
878-
879-
pending_read_buffer,
880-
pending_read_buffer_pos: 0,
881-
pending_read_is_header: false,
882-
883-
sync_status: InitSyncTracker::NoSyncRequested,
884-
885-
msgs_sent_since_pong: 0,
886-
awaiting_pong_timer_tick_intervals: 0,
887-
received_message_since_timer_tick: false,
888-
sent_gossip_timestamp_filter: false,
889-
890-
received_channel_announce_since_backlogged: false,
891-
inbound_connection: true,
892-
})).is_some() {
893-
panic!("PeerManager driver duplicated descriptors!");
894-
};
895-
Ok(())
874+
match peers.entry(descriptor) {
875+
hash_map::Entry::Occupied(_) => {
876+
debug_assert!(false, "PeerManager driver duplicated descriptors!");
877+
Err(PeerHandleError {})
878+
},
879+
hash_map::Entry::Vacant(e) => {
880+
e.insert(Mutex::new(Peer {
881+
channel_encryptor: peer_encryptor,
882+
their_node_id: None,
883+
their_features: None,
884+
their_net_address: remote_network_address,
885+
886+
pending_outbound_buffer: LinkedList::new(),
887+
pending_outbound_buffer_first_msg_offset: 0,
888+
gossip_broadcast_buffer: LinkedList::new(),
889+
awaiting_write_event: false,
890+
891+
pending_read_buffer,
892+
pending_read_buffer_pos: 0,
893+
pending_read_is_header: false,
894+
895+
sync_status: InitSyncTracker::NoSyncRequested,
896+
897+
msgs_sent_since_pong: 0,
898+
awaiting_pong_timer_tick_intervals: 0,
899+
received_message_since_timer_tick: false,
900+
sent_gossip_timestamp_filter: false,
901+
902+
received_channel_announce_since_backlogged: false,
903+
inbound_connection: true,
904+
}));
905+
Ok(())
906+
}
907+
}
896908
}
897909

898910
fn peer_should_read(&self, peer: &mut Peer) -> bool {
@@ -1141,9 +1153,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
11411153
macro_rules! insert_node_id {
11421154
() => {
11431155
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
1144-
hash_map::Entry::Occupied(_) => {
1156+
hash_map::Entry::Occupied(e) => {
11451157
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
11461158
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
1159+
// Check that the peers map is consistent with the
1160+
// node_id_to_descriptor map, as this has been broken
1161+
// before.
1162+
debug_assert!(peers.get(e.get()).is_some());
11471163
return Err(PeerHandleError { })
11481164
},
11491165
hash_map::Entry::Vacant(entry) => {
@@ -1913,7 +1929,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19131929
self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
19141930
}
19151931
self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
1916-
}
1932+
} else { debug_assert!(false, "Missing connection for peer"); }
19171933
}
19181934
}
19191935
}
@@ -1951,11 +1967,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19511967
},
19521968
Some(peer_lock) => {
19531969
let peer = peer_lock.lock().unwrap();
1954-
if !peer.handshake_complete() { return; }
1955-
debug_assert!(peer.their_node_id.is_some());
19561970
if let Some((node_id, _)) = peer.their_node_id {
19571971
log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id));
1958-
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
1972+
let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
1973+
debug_assert!(removed.is_some(), "descriptor maps should be consistent");
1974+
if !peer.handshake_complete() { return; }
19591975
self.message_handler.chan_handler.peer_disconnected(&node_id);
19601976
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
19611977
}
@@ -2188,12 +2204,13 @@ mod tests {
21882204

21892205
use crate::prelude::*;
21902206
use crate::sync::{Arc, Mutex};
2191-
use core::sync::atomic::Ordering;
2207+
use core::sync::atomic::{AtomicBool, Ordering};
21922208

21932209
#[derive(Clone)]
21942210
struct FileDescriptor {
21952211
fd: u16,
21962212
outbound_data: Arc<Mutex<Vec<u8>>>,
2213+
disconnect: Arc<AtomicBool>,
21972214
}
21982215
impl PartialEq for FileDescriptor {
21992216
fn eq(&self, other: &Self) -> bool {
@@ -2213,7 +2230,7 @@ mod tests {
22132230
data.len()
22142231
}
22152232

2216-
fn disconnect_socket(&mut self) {}
2233+
fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); }
22172234
}
22182235

22192236
struct PeerManagerCfg {
@@ -2254,10 +2271,16 @@ mod tests {
22542271

22552272
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
22562273
let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
2257-
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
2274+
let mut fd_a = FileDescriptor {
2275+
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
2276+
disconnect: Arc::new(AtomicBool::new(false)),
2277+
};
22582278
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
22592279
let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap();
2260-
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
2280+
let mut fd_b = FileDescriptor {
2281+
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
2282+
disconnect: Arc::new(AtomicBool::new(false)),
2283+
};
22612284
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
22622285
let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
22632286
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
@@ -2281,6 +2304,84 @@ mod tests {
22812304
(fd_a.clone(), fd_b.clone())
22822305
}
22832306

2307+
#[test]
2308+
#[cfg(feature = "std")]
2309+
fn fuzz_threaded_connections() {
2310+
// Spawn two threads which repeatedly connect two peers together, leading to "got second
2311+
// connection with peer" disconnections and rapid reconnect. This previously found an issue
2312+
// with our internal map consistency, and is a generally good smoke test of disconnection.
2313+
let cfgs = Arc::new(create_peermgr_cfgs(2));
2314+
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2315+
let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ }));
2316+
2317+
let start_time = std::time::Instant::now();
2318+
macro_rules! spawn_thread { ($id: expr) => { {
2319+
let peers = Arc::clone(&peers);
2320+
let cfgs = Arc::clone(&cfgs);
2321+
std::thread::spawn(move || {
2322+
let mut ctr = 0;
2323+
while start_time.elapsed() < std::time::Duration::from_secs(1) {
2324+
let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
2325+
let mut fd_a = FileDescriptor {
2326+
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
2327+
disconnect: Arc::new(AtomicBool::new(false)),
2328+
};
2329+
let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
2330+
let mut fd_b = FileDescriptor {
2331+
fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
2332+
disconnect: Arc::new(AtomicBool::new(false)),
2333+
};
2334+
let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
2335+
let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
2336+
peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
2337+
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
2338+
2339+
while start_time.elapsed() < std::time::Duration::from_secs(1) {
2340+
peers[0].process_events();
2341+
if fd_a.disconnect.load(Ordering::Acquire) { break; }
2342+
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
2343+
if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; }
2344+
2345+
peers[1].process_events();
2346+
if fd_b.disconnect.load(Ordering::Acquire) { break; }
2347+
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
2348+
if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; }
2349+
2350+
cfgs[0].chan_handler.pending_events.lock().unwrap()
2351+
.push(crate::util::events::MessageSendEvent::SendShutdown {
2352+
node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(),
2353+
msg: msgs::Shutdown {
2354+
channel_id: [0; 32],
2355+
scriptpubkey: bitcoin::Script::new(),
2356+
},
2357+
});
2358+
cfgs[1].chan_handler.pending_events.lock().unwrap()
2359+
.push(crate::util::events::MessageSendEvent::SendShutdown {
2360+
node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(),
2361+
msg: msgs::Shutdown {
2362+
channel_id: [0; 32],
2363+
scriptpubkey: bitcoin::Script::new(),
2364+
},
2365+
});
2366+
2367+
peers[0].timer_tick_occurred();
2368+
peers[1].timer_tick_occurred();
2369+
}
2370+
2371+
peers[0].socket_disconnected(&fd_a);
2372+
peers[1].socket_disconnected(&fd_b);
2373+
ctr += 1;
2374+
std::thread::sleep(std::time::Duration::from_micros(1));
2375+
}
2376+
})
2377+
} } }
2378+
let thrd_a = spawn_thread!(1);
2379+
let thrd_b = spawn_thread!(2);
2380+
2381+
thrd_a.join().unwrap();
2382+
thrd_b.join().unwrap();
2383+
}
2384+
22842385
#[test]
22852386
fn test_disconnect_peer() {
22862387
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2337,7 +2438,10 @@ mod tests {
23372438
let cfgs = create_peermgr_cfgs(2);
23382439
let peers = create_network(2, &cfgs);
23392440

2340-
let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) };
2441+
let mut fd_dup = FileDescriptor {
2442+
fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())),
2443+
disconnect: Arc::new(AtomicBool::new(false)),
2444+
};
23412445
let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003};
23422446
let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
23432447
peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
@@ -2441,8 +2545,14 @@ mod tests {
24412545
let peers = create_network(2, &cfgs);
24422546

24432547
let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap();
2444-
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
2445-
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
2548+
let mut fd_a = FileDescriptor {
2549+
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
2550+
disconnect: Arc::new(AtomicBool::new(false)),
2551+
};
2552+
let mut fd_b = FileDescriptor {
2553+
fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
2554+
disconnect: Arc::new(AtomicBool::new(false)),
2555+
};
24462556
let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
24472557
peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
24482558

0 commit comments

Comments
 (0)