@@ -2206,12 +2206,13 @@ mod tests {
2206
2206
2207
2207
use crate :: prelude:: * ;
2208
2208
use crate :: sync:: { Arc , Mutex } ;
2209
- use core:: sync:: atomic:: Ordering ;
2209
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
2210
2210
2211
2211
#[ derive( Clone ) ]
2212
2212
struct FileDescriptor {
2213
2213
fd : u16 ,
2214
2214
outbound_data : Arc < Mutex < Vec < u8 > > > ,
2215
+ disconnect : Arc < AtomicBool > ,
2215
2216
}
2216
2217
impl PartialEq for FileDescriptor {
2217
2218
fn eq ( & self , other : & Self ) -> bool {
@@ -2231,7 +2232,7 @@ mod tests {
2231
2232
data. len ( )
2232
2233
}
2233
2234
2234
- fn disconnect_socket ( & mut self ) { }
2235
+ fn disconnect_socket ( & mut self ) { self . disconnect . store ( true , Ordering :: Release ) ; }
2235
2236
}
2236
2237
2237
2238
struct PeerManagerCfg {
@@ -2272,10 +2273,16 @@ mod tests {
2272
2273
2273
2274
fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > ) -> ( FileDescriptor , FileDescriptor ) {
2274
2275
let id_a = peer_a. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2275
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2276
+ let mut fd_a = FileDescriptor {
2277
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2278
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2279
+ } ;
2276
2280
let addr_a = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
2277
2281
let id_b = peer_b. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2278
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2282
+ let mut fd_b = FileDescriptor {
2283
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2284
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2285
+ } ;
2279
2286
let addr_b = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
2280
2287
let initial_data = peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
2281
2288
peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
@@ -2299,6 +2306,80 @@ mod tests {
2299
2306
( fd_a. clone ( ) , fd_b. clone ( ) )
2300
2307
}
2301
2308
2309
+ #[ test]
2310
+ fn fuzz_threaded_connections ( ) {
2311
+ let cfgs = Arc :: new ( create_peermgr_cfgs ( 2 ) ) ;
2312
+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2313
+ let peers = Arc :: new ( create_network ( 2 , unsafe { & * ( & * cfgs as * const _ ) as & ' static _ } ) ) ;
2314
+
2315
+ let start_time = std:: time:: Instant :: now ( ) ;
2316
+ macro_rules! spawn_thread { ( $id: expr) => { {
2317
+ let peers = Arc :: clone( & peers) ;
2318
+ let cfgs = Arc :: clone( & cfgs) ;
2319
+ std:: thread:: spawn( move || {
2320
+ let mut ctr = 0 ;
2321
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2322
+ let id_a = peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ;
2323
+ let mut fd_a = FileDescriptor {
2324
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2325
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2326
+ } ;
2327
+ let addr_a = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1000 } ;
2328
+ let mut fd_b = FileDescriptor {
2329
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2330
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2331
+ } ;
2332
+ let addr_b = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1001 } ;
2333
+ let initial_data = peers[ 1 ] . new_outbound_connection( id_a, fd_b. clone( ) , Some ( addr_a. clone( ) ) ) . unwrap( ) ;
2334
+ peers[ 0 ] . new_inbound_connection( fd_a. clone( ) , Some ( addr_b. clone( ) ) ) . unwrap( ) ;
2335
+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2336
+
2337
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2338
+ peers[ 0 ] . process_events( ) ;
2339
+ if fd_a. disconnect. load( Ordering :: Acquire ) { break ; }
2340
+ let a_data = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2341
+ if peers[ 1 ] . read_event( & mut fd_b, & a_data) . is_err( ) { break ; }
2342
+
2343
+ peers[ 1 ] . process_events( ) ;
2344
+ if fd_b. disconnect. load( Ordering :: Acquire ) { break ; }
2345
+ let b_data = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2346
+ if peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) { break ; }
2347
+
2348
+ cfgs[ 0 ] . chan_handler. pending_events. lock( ) . unwrap( )
2349
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2350
+ node_id: peers[ 1 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2351
+ msg: msgs:: Shutdown {
2352
+ channel_id: [ 0 ; 32 ] ,
2353
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2354
+ } ,
2355
+ } ) ;
2356
+ cfgs[ 1 ] . chan_handler. pending_events. lock( ) . unwrap( )
2357
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2358
+ node_id: peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2359
+ msg: msgs:: Shutdown {
2360
+ channel_id: [ 0 ; 32 ] ,
2361
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2362
+ } ,
2363
+ } ) ;
2364
+
2365
+ peers[ 0 ] . timer_tick_occurred( ) ;
2366
+ peers[ 1 ] . timer_tick_occurred( ) ;
2367
+ }
2368
+
2369
+ peers[ 0 ] . socket_disconnected( & fd_a) ;
2370
+ peers[ 1 ] . socket_disconnected( & fd_b) ;
2371
+ ctr += 1 ;
2372
+ std:: thread:: sleep( std:: time:: Duration :: from_micros( 1 ) ) ;
2373
+ }
2374
+ } )
2375
+ } } }
2376
+ let thrd_a = spawn_thread ! ( 1 ) ;
2377
+ let thrd_b = spawn_thread ! ( 2 ) ;
2378
+
2379
+ thrd_a. join ( ) . unwrap ( ) ;
2380
+ thrd_b. join ( ) . unwrap ( ) ;
2381
+ }
2382
+
2302
2383
#[ test]
2303
2384
fn test_disconnect_peer ( ) {
2304
2385
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2355,7 +2436,10 @@ mod tests {
2355
2436
let cfgs = create_peermgr_cfgs ( 2 ) ;
2356
2437
let peers = create_network ( 2 , & cfgs) ;
2357
2438
2358
- let mut fd_dup = FileDescriptor { fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2439
+ let mut fd_dup = FileDescriptor {
2440
+ fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2441
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2442
+ } ;
2359
2443
let addr_dup = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1003 } ;
2360
2444
let id_a = cfgs[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2361
2445
peers[ 0 ] . new_inbound_connection ( fd_dup. clone ( ) , Some ( addr_dup. clone ( ) ) ) . unwrap ( ) ;
@@ -2459,8 +2543,14 @@ mod tests {
2459
2543
let peers = create_network ( 2 , & cfgs) ;
2460
2544
2461
2545
let a_id = peers[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2462
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2463
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2546
+ let mut fd_a = FileDescriptor {
2547
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2548
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2549
+ } ;
2550
+ let mut fd_b = FileDescriptor {
2551
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2552
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2553
+ } ;
2464
2554
let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
2465
2555
peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
2466
2556
0 commit comments