@@ -2204,12 +2204,13 @@ mod tests {
2204
2204
2205
2205
use crate :: prelude:: * ;
2206
2206
use crate :: sync:: { Arc , Mutex } ;
2207
- use core:: sync:: atomic:: Ordering ;
2207
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
2208
2208
2209
2209
#[ derive( Clone ) ]
2210
2210
struct FileDescriptor {
2211
2211
fd : u16 ,
2212
2212
outbound_data : Arc < Mutex < Vec < u8 > > > ,
2213
+ disconnect : Arc < AtomicBool > ,
2213
2214
}
2214
2215
impl PartialEq for FileDescriptor {
2215
2216
fn eq ( & self , other : & Self ) -> bool {
@@ -2229,7 +2230,7 @@ mod tests {
2229
2230
data. len ( )
2230
2231
}
2231
2232
2232
- fn disconnect_socket ( & mut self ) { }
2233
+ fn disconnect_socket ( & mut self ) { self . disconnect . store ( true , Ordering :: Release ) ; }
2233
2234
}
2234
2235
2235
2236
struct PeerManagerCfg {
@@ -2270,10 +2271,16 @@ mod tests {
2270
2271
2271
2272
fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > ) -> ( FileDescriptor , FileDescriptor ) {
2272
2273
let id_a = peer_a. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2273
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2274
+ let mut fd_a = FileDescriptor {
2275
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2276
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2277
+ } ;
2274
2278
let addr_a = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
2275
2279
let id_b = peer_b. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2276
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2280
+ let mut fd_b = FileDescriptor {
2281
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2282
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2283
+ } ;
2277
2284
let addr_b = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
2278
2285
let initial_data = peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
2279
2286
peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
@@ -2297,6 +2304,84 @@ mod tests {
2297
2304
( fd_a. clone ( ) , fd_b. clone ( ) )
2298
2305
}
2299
2306
2307
+ #[ test]
2308
+ #[ cfg( feature = "std" ) ]
2309
+ fn fuzz_threaded_connections ( ) {
2310
+ // Spawn two threads which repeatedly connect two peers together, leading to "got second
2311
+ // connection with peer" disconnections and rapid reconnect. This previously found an issue
2312
+ // with our internal map consistency, and is a generally good smoke test of disconnection.
2313
+ let cfgs = Arc :: new ( create_peermgr_cfgs ( 2 ) ) ;
2314
+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2315
+ let peers = Arc :: new ( create_network ( 2 , unsafe { & * ( & * cfgs as * const _ ) as & ' static _ } ) ) ;
2316
+
2317
+ let start_time = std:: time:: Instant :: now ( ) ;
2318
+ macro_rules! spawn_thread { ( $id: expr) => { {
2319
+ let peers = Arc :: clone( & peers) ;
2320
+ let cfgs = Arc :: clone( & cfgs) ;
2321
+ std:: thread:: spawn( move || {
2322
+ let mut ctr = 0 ;
2323
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2324
+ let id_a = peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ;
2325
+ let mut fd_a = FileDescriptor {
2326
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2327
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2328
+ } ;
2329
+ let addr_a = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1000 } ;
2330
+ let mut fd_b = FileDescriptor {
2331
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2332
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2333
+ } ;
2334
+ let addr_b = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1001 } ;
2335
+ let initial_data = peers[ 1 ] . new_outbound_connection( id_a, fd_b. clone( ) , Some ( addr_a. clone( ) ) ) . unwrap( ) ;
2336
+ peers[ 0 ] . new_inbound_connection( fd_a. clone( ) , Some ( addr_b. clone( ) ) ) . unwrap( ) ;
2337
+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2338
+
2339
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2340
+ peers[ 0 ] . process_events( ) ;
2341
+ if fd_a. disconnect. load( Ordering :: Acquire ) { break ; }
2342
+ let a_data = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2343
+ if peers[ 1 ] . read_event( & mut fd_b, & a_data) . is_err( ) { break ; }
2344
+
2345
+ peers[ 1 ] . process_events( ) ;
2346
+ if fd_b. disconnect. load( Ordering :: Acquire ) { break ; }
2347
+ let b_data = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2348
+ if peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) { break ; }
2349
+
2350
+ cfgs[ 0 ] . chan_handler. pending_events. lock( ) . unwrap( )
2351
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2352
+ node_id: peers[ 1 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2353
+ msg: msgs:: Shutdown {
2354
+ channel_id: [ 0 ; 32 ] ,
2355
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2356
+ } ,
2357
+ } ) ;
2358
+ cfgs[ 1 ] . chan_handler. pending_events. lock( ) . unwrap( )
2359
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2360
+ node_id: peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2361
+ msg: msgs:: Shutdown {
2362
+ channel_id: [ 0 ; 32 ] ,
2363
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2364
+ } ,
2365
+ } ) ;
2366
+
2367
+ peers[ 0 ] . timer_tick_occurred( ) ;
2368
+ peers[ 1 ] . timer_tick_occurred( ) ;
2369
+ }
2370
+
2371
+ peers[ 0 ] . socket_disconnected( & fd_a) ;
2372
+ peers[ 1 ] . socket_disconnected( & fd_b) ;
2373
+ ctr += 1 ;
2374
+ std:: thread:: sleep( std:: time:: Duration :: from_micros( 1 ) ) ;
2375
+ }
2376
+ } )
2377
+ } } }
2378
+ let thrd_a = spawn_thread ! ( 1 ) ;
2379
+ let thrd_b = spawn_thread ! ( 2 ) ;
2380
+
2381
+ thrd_a. join ( ) . unwrap ( ) ;
2382
+ thrd_b. join ( ) . unwrap ( ) ;
2383
+ }
2384
+
2300
2385
#[ test]
2301
2386
fn test_disconnect_peer ( ) {
2302
2387
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2353,7 +2438,10 @@ mod tests {
2353
2438
let cfgs = create_peermgr_cfgs ( 2 ) ;
2354
2439
let peers = create_network ( 2 , & cfgs) ;
2355
2440
2356
- let mut fd_dup = FileDescriptor { fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2441
+ let mut fd_dup = FileDescriptor {
2442
+ fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2443
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2444
+ } ;
2357
2445
let addr_dup = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1003 } ;
2358
2446
let id_a = cfgs[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2359
2447
peers[ 0 ] . new_inbound_connection ( fd_dup. clone ( ) , Some ( addr_dup. clone ( ) ) ) . unwrap ( ) ;
@@ -2457,8 +2545,14 @@ mod tests {
2457
2545
let peers = create_network ( 2 , & cfgs) ;
2458
2546
2459
2547
let a_id = peers[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2460
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2461
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2548
+ let mut fd_a = FileDescriptor {
2549
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2550
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2551
+ } ;
2552
+ let mut fd_b = FileDescriptor {
2553
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2554
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2555
+ } ;
2462
2556
let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
2463
2557
peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
2464
2558
0 commit comments