@@ -2206,12 +2206,13 @@ mod tests {
2206
2206
2207
2207
use crate :: prelude:: * ;
2208
2208
use crate :: sync:: { Arc , Mutex } ;
2209
- use core:: sync:: atomic:: Ordering ;
2209
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
2210
2210
2211
2211
#[ derive( Clone ) ]
2212
2212
struct FileDescriptor {
2213
2213
fd : u16 ,
2214
2214
outbound_data : Arc < Mutex < Vec < u8 > > > ,
2215
+ disconnect : Arc < AtomicBool > ,
2215
2216
}
2216
2217
impl PartialEq for FileDescriptor {
2217
2218
fn eq ( & self , other : & Self ) -> bool {
@@ -2231,7 +2232,7 @@ mod tests {
2231
2232
data. len ( )
2232
2233
}
2233
2234
2234
- fn disconnect_socket ( & mut self ) { }
2235
+ fn disconnect_socket ( & mut self ) { self . disconnect . store ( true , Ordering :: Release ) ; }
2235
2236
}
2236
2237
2237
2238
struct PeerManagerCfg {
@@ -2272,10 +2273,16 @@ mod tests {
2272
2273
2273
2274
fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler , & ' a test_utils:: TestRoutingMessageHandler , IgnoringMessageHandler , & ' a test_utils:: TestLogger , IgnoringMessageHandler , & ' a test_utils:: TestNodeSigner > ) -> ( FileDescriptor , FileDescriptor ) {
2274
2275
let id_a = peer_a. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2275
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2276
+ let mut fd_a = FileDescriptor {
2277
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2278
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2279
+ } ;
2276
2280
let addr_a = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
2277
2281
let id_b = peer_b. node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2278
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2282
+ let mut fd_b = FileDescriptor {
2283
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2284
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2285
+ } ;
2279
2286
let addr_b = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
2280
2287
let initial_data = peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
2281
2288
peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
@@ -2299,6 +2306,81 @@ mod tests {
2299
2306
( fd_a. clone ( ) , fd_b. clone ( ) )
2300
2307
}
2301
2308
2309
+ #[ test]
2310
+ #[ cfg( feature = "std" ) ]
2311
+ fn fuzz_threaded_connections ( ) {
2312
+ let cfgs = Arc :: new ( create_peermgr_cfgs ( 2 ) ) ;
2313
+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
2314
+ let peers = Arc :: new ( create_network ( 2 , unsafe { & * ( & * cfgs as * const _ ) as & ' static _ } ) ) ;
2315
+
2316
+ let start_time = std:: time:: Instant :: now ( ) ;
2317
+ macro_rules! spawn_thread { ( $id: expr) => { {
2318
+ let peers = Arc :: clone( & peers) ;
2319
+ let cfgs = Arc :: clone( & cfgs) ;
2320
+ std:: thread:: spawn( move || {
2321
+ let mut ctr = 0 ;
2322
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2323
+ let id_a = peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ;
2324
+ let mut fd_a = FileDescriptor {
2325
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2326
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2327
+ } ;
2328
+ let addr_a = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1000 } ;
2329
+ let mut fd_b = FileDescriptor {
2330
+ fd: $id + ctr * 3 , outbound_data: Arc :: new( Mutex :: new( Vec :: new( ) ) ) ,
2331
+ disconnect: Arc :: new( AtomicBool :: new( false ) ) ,
2332
+ } ;
2333
+ let addr_b = NetAddress :: IPv4 { addr: [ 127 , 0 , 0 , 1 ] , port: 1001 } ;
2334
+ let initial_data = peers[ 1 ] . new_outbound_connection( id_a, fd_b. clone( ) , Some ( addr_a. clone( ) ) ) . unwrap( ) ;
2335
+ peers[ 0 ] . new_inbound_connection( fd_a. clone( ) , Some ( addr_b. clone( ) ) ) . unwrap( ) ;
2336
+ assert_eq!( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
2337
+
2338
+ while start_time. elapsed( ) < std:: time:: Duration :: from_secs( 1 ) {
2339
+ peers[ 0 ] . process_events( ) ;
2340
+ if fd_a. disconnect. load( Ordering :: Acquire ) { break ; }
2341
+ let a_data = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2342
+ if peers[ 1 ] . read_event( & mut fd_b, & a_data) . is_err( ) { break ; }
2343
+
2344
+ peers[ 1 ] . process_events( ) ;
2345
+ if fd_b. disconnect. load( Ordering :: Acquire ) { break ; }
2346
+ let b_data = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
2347
+ if peers[ 0 ] . read_event( & mut fd_a, & b_data) . is_err( ) { break ; }
2348
+
2349
+ cfgs[ 0 ] . chan_handler. pending_events. lock( ) . unwrap( )
2350
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2351
+ node_id: peers[ 1 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2352
+ msg: msgs:: Shutdown {
2353
+ channel_id: [ 0 ; 32 ] ,
2354
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2355
+ } ,
2356
+ } ) ;
2357
+ cfgs[ 1 ] . chan_handler. pending_events. lock( ) . unwrap( )
2358
+ . push( crate :: util:: events:: MessageSendEvent :: SendShutdown {
2359
+ node_id: peers[ 0 ] . node_signer. get_node_id( Recipient :: Node ) . unwrap( ) ,
2360
+ msg: msgs:: Shutdown {
2361
+ channel_id: [ 0 ; 32 ] ,
2362
+ scriptpubkey: bitcoin:: Script :: new( ) ,
2363
+ } ,
2364
+ } ) ;
2365
+
2366
+ peers[ 0 ] . timer_tick_occurred( ) ;
2367
+ peers[ 1 ] . timer_tick_occurred( ) ;
2368
+ }
2369
+
2370
+ peers[ 0 ] . socket_disconnected( & fd_a) ;
2371
+ peers[ 1 ] . socket_disconnected( & fd_b) ;
2372
+ ctr += 1 ;
2373
+ std:: thread:: sleep( std:: time:: Duration :: from_micros( 1 ) ) ;
2374
+ }
2375
+ } )
2376
+ } } }
2377
+ let thrd_a = spawn_thread ! ( 1 ) ;
2378
+ let thrd_b = spawn_thread ! ( 2 ) ;
2379
+
2380
+ thrd_a. join ( ) . unwrap ( ) ;
2381
+ thrd_b. join ( ) . unwrap ( ) ;
2382
+ }
2383
+
2302
2384
#[ test]
2303
2385
fn test_disconnect_peer ( ) {
2304
2386
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -2355,7 +2437,10 @@ mod tests {
2355
2437
let cfgs = create_peermgr_cfgs ( 2 ) ;
2356
2438
let peers = create_network ( 2 , & cfgs) ;
2357
2439
2358
- let mut fd_dup = FileDescriptor { fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2440
+ let mut fd_dup = FileDescriptor {
2441
+ fd : 3 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2442
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2443
+ } ;
2359
2444
let addr_dup = NetAddress :: IPv4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1003 } ;
2360
2445
let id_a = cfgs[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2361
2446
peers[ 0 ] . new_inbound_connection ( fd_dup. clone ( ) , Some ( addr_dup. clone ( ) ) ) . unwrap ( ) ;
@@ -2459,8 +2544,14 @@ mod tests {
2459
2544
let peers = create_network ( 2 , & cfgs) ;
2460
2545
2461
2546
let a_id = peers[ 0 ] . node_signer . get_node_id ( Recipient :: Node ) . unwrap ( ) ;
2462
- let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2463
- let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
2547
+ let mut fd_a = FileDescriptor {
2548
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2549
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2550
+ } ;
2551
+ let mut fd_b = FileDescriptor {
2552
+ fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
2553
+ disconnect : Arc :: new ( AtomicBool :: new ( false ) ) ,
2554
+ } ;
2464
2555
let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
2465
2556
peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
2466
2557
0 commit comments