Skip to content

Commit 36b4b7e

Browse files
committed
Correct peer_handler::test_process_events_multithreaded
This test was added some time ago in 0c034e9, but never made any sense. `PeerManager::process_events` will go around its loop as many times is required to ensure we've always processed all events which were pending prior to a `process_events` call, so having a test that checks that we never go around more than twice is obviously broken. And, indeed, in CI this tests fails with some regularity. Instead, the test here is changed to ensure that we detectably go around the loop again at least once. Fixes #2385
1 parent 05ed0db commit 36b4b7e

File tree

2 files changed

+33
-32
lines changed

2 files changed

+33
-32
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3452,48 +3452,45 @@ mod tests {
34523452
#[cfg(feature = "std")]
34533453
fn test_process_events_multithreaded() {
34543454
use std::time::{Duration, Instant};
3455-
// Test that `process_events` getting called on multiple threads doesn't generate too many
3456-
// loop iterations.
3455+
// `process_events` shouldn't block on another thread processing events and instead should
3456+
// simply signal the currently processing thread to go around the loop again.
3457+
// Here we test that this happens by spwaning a few threads and checking that we see one go
3458+
// around again at least once.
3459+
//
34573460
// Each time `process_events` goes around the loop we call
3458-
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
3459-
// Because the loop should go around once more after a call which fails to take the
3460-
// single-threaded lock, if we write zero to the counter before calling `process_events` we
3461-
// should never observe there having been more than 2 loop iterations.
3462-
// Further, because the last thread to exit will call `process_events` before returning, we
3463-
// should always have at least one count at the end.
3461+
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. Thus,
3462+
// to test we simply write zero to the counter before calling `process_events` and make
3463+
// sure we observe a value greater than one at least once.
34643464
let cfg = Arc::new(create_peermgr_cfgs(1));
34653465
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
34663466
let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap());
34673467

3468-
let exit_flag = Arc::new(AtomicBool::new(false));
3469-
macro_rules! spawn_thread { () => { {
3468+
let end_time = Instant::now() + Duration::from_millis(100);
3469+
let observed_loop = Arc::new(AtomicBool::new(false));
3470+
let thread_fn = || {
34703471
let thread_cfg = Arc::clone(&cfg);
34713472
let thread_peer = Arc::clone(&peer);
3472-
let thread_exit = Arc::clone(&exit_flag);
3473-
std::thread::spawn(move || {
3474-
while !thread_exit.load(Ordering::Acquire) {
3475-
thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release);
3473+
let thread_observed_loop = Arc::clone(&observed_loop);
3474+
move || {
3475+
while Instant::now() < end_time || !thread_observed_loop.load(Ordering::Acquire) {
3476+
test_utils::TestChannelMessageHandler::message_fetch_counter.with(|val| val.store(0, Ordering::Relaxed));
34763477
thread_peer.process_events();
3478+
if test_utils::TestChannelMessageHandler::message_fetch_counter.with(|val| val.load(Ordering::Relaxed)) > 1 {
3479+
thread_observed_loop.store(true, Ordering::Release);
3480+
return;
3481+
}
34773482
std::thread::sleep(Duration::from_micros(1));
34783483
}
3479-
})
3480-
} } }
3481-
3482-
let thread_a = spawn_thread!();
3483-
let thread_b = spawn_thread!();
3484-
let thread_c = spawn_thread!();
3485-
3486-
let start_time = Instant::now();
3487-
while start_time.elapsed() < Duration::from_millis(100) {
3488-
let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire);
3489-
assert!(val <= 2);
3490-
std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?!
3491-
}
3484+
}
3485+
};
34923486

3493-
exit_flag.store(true, Ordering::Release);
3487+
let thread_a = std::thread::spawn(thread_fn());
3488+
let thread_b = std::thread::spawn(thread_fn());
3489+
let thread_c = std::thread::spawn(thread_fn());
3490+
thread_fn()();
34943491
thread_a.join().unwrap();
34953492
thread_b.join().unwrap();
34963493
thread_c.join().unwrap();
3497-
assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1);
3494+
assert!(observed_loop.load(Ordering::Acquire));
34983495
}
34993496
}

lightning/src/util/test_utils.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,17 +759,21 @@ pub struct TestChannelMessageHandler {
759759
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
760760
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
761761
connected_peers: Mutex<HashSet<PublicKey>>,
762-
pub message_fetch_counter: AtomicUsize,
763762
chain_hash: ChainHash,
764763
}
765764

765+
impl TestChannelMessageHandler {
766+
thread_local! {
767+
pub static message_fetch_counter: AtomicUsize = AtomicUsize::new(0);
768+
}
769+
}
770+
766771
impl TestChannelMessageHandler {
767772
pub fn new(chain_hash: ChainHash) -> Self {
768773
TestChannelMessageHandler {
769774
pending_events: Mutex::new(Vec::new()),
770775
expected_recv_msgs: Mutex::new(None),
771776
connected_peers: Mutex::new(new_hash_set()),
772-
message_fetch_counter: AtomicUsize::new(0),
773777
chain_hash,
774778
}
775779
}
@@ -940,7 +944,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
940944

941945
impl events::MessageSendEventsProvider for TestChannelMessageHandler {
942946
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
943-
self.message_fetch_counter.fetch_add(1, Ordering::AcqRel);
947+
Self::message_fetch_counter.with(|val| val.fetch_add(1, Ordering::AcqRel));
944948
let mut pending_events = self.pending_events.lock().unwrap();
945949
let mut ret = Vec::new();
946950
mem::swap(&mut ret, &mut *pending_events);

0 commit comments

Comments
 (0)