Skip to content

Correct peer_handler::test_process_events_multithreaded #3254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3452,48 +3452,44 @@ mod tests {
#[cfg(feature = "std")]
fn test_process_events_multithreaded() {
use std::time::{Duration, Instant};
// Test that `process_events` getting called on multiple threads doesn't generate too many
// loop iterations.
// `process_events` shouldn't block on another thread processing events and instead should
// simply signal the currently processing thread to go around the loop again.
// Here we test that this happens by spawning a few threads and checking that we see one go
// around again at least once.
//
// Each time `process_events` goes around the loop we call
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
// Because the loop should go around once more after a call which fails to take the
// single-threaded lock, if we write zero to the counter before calling `process_events` we
// should never observe there having been more than 2 loop iterations.
// Further, because the last thread to exit will call `process_events` before returning, we
// should always have at least one count at the end.
// `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. Thus,
// to test we simply write zero to the counter before calling `process_events` and make
// sure we observe a value greater than one at least once.
let cfg = Arc::new(create_peermgr_cfgs(1));
// Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap());

let exit_flag = Arc::new(AtomicBool::new(false));
macro_rules! spawn_thread { () => { {
let thread_cfg = Arc::clone(&cfg);
let end_time = Instant::now() + Duration::from_millis(100);
let observed_loop = Arc::new(AtomicBool::new(false));
let thread_fn = || {
let thread_peer = Arc::clone(&peer);
let thread_exit = Arc::clone(&exit_flag);
std::thread::spawn(move || {
while !thread_exit.load(Ordering::Acquire) {
thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release);
let thread_observed_loop = Arc::clone(&observed_loop);
move || {
while Instant::now() < end_time || !thread_observed_loop.load(Ordering::Acquire) {
test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER.with(|val| val.store(0, Ordering::Relaxed));
thread_peer.process_events();
if test_utils::TestChannelMessageHandler::MESSAGE_FETCH_COUNTER.with(|val| val.load(Ordering::Relaxed)) > 1 {
thread_observed_loop.store(true, Ordering::Release);
return;
}
std::thread::sleep(Duration::from_micros(1));
}
})
} } }

let thread_a = spawn_thread!();
let thread_b = spawn_thread!();
let thread_c = spawn_thread!();

let start_time = Instant::now();
while start_time.elapsed() < Duration::from_millis(100) {
let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire);
assert!(val <= 2);
std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this fix?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, none of the threads completely busy-loop now, they all sleep which should yield++.

}
}
};

exit_flag.store(true, Ordering::Release);
let thread_a = std::thread::spawn(thread_fn());
let thread_b = std::thread::spawn(thread_fn());
let thread_c = std::thread::spawn(thread_fn());
thread_fn()();
thread_a.join().unwrap();
thread_b.join().unwrap();
thread_c.join().unwrap();
assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1);
assert!(observed_loop.load(Ordering::Acquire));
}
}
10 changes: 7 additions & 3 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,17 +759,21 @@ pub struct TestChannelMessageHandler {
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
connected_peers: Mutex<HashSet<PublicKey>>,
pub message_fetch_counter: AtomicUsize,
chain_hash: ChainHash,
}

impl TestChannelMessageHandler {
thread_local! {
pub static MESSAGE_FETCH_COUNTER: AtomicUsize = AtomicUsize::new(0);
}
}

impl TestChannelMessageHandler {
pub fn new(chain_hash: ChainHash) -> Self {
TestChannelMessageHandler {
pending_events: Mutex::new(Vec::new()),
expected_recv_msgs: Mutex::new(None),
connected_peers: Mutex::new(new_hash_set()),
message_fetch_counter: AtomicUsize::new(0),
chain_hash,
}
}
Expand Down Expand Up @@ -940,7 +944,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {

impl events::MessageSendEventsProvider for TestChannelMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
self.message_fetch_counter.fetch_add(1, Ordering::AcqRel);
Self::MESSAGE_FETCH_COUNTER.with(|val| val.fetch_add(1, Ordering::AcqRel));
let mut pending_events = self.pending_events.lock().unwrap();
let mut ret = Vec::new();
mem::swap(&mut ret, &mut *pending_events);
Expand Down
Loading