Skip to content

Commit 68b3d2e

Browse files
committed
Move PersistenceNotifier to a new util module
It was always somewhat strange to have a bunch of notification logic in `channelmanager`, and with the next commit adding a bunch more, its moved here first.
1 parent d024251 commit 68b3d2e

File tree

3 files changed

+155
-128
lines changed

3 files changed

+155
-128
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 4 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Rec
5454
use util::config::{UserConfig, ChannelConfig};
5555
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5656
use util::{byte_utils, events};
57+
use util::crypto::sign;
58+
use util::wakers::PersistenceNotifier;
5759
use util::scid_utils::fake_scid;
5860
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
5961
use util::logger::{Level, Logger};
@@ -64,15 +66,11 @@ use prelude::*;
6466
use core::{cmp, mem};
6567
use core::cell::RefCell;
6668
use io::Read;
67-
use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
69+
use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
6870
use core::sync::atomic::{AtomicUsize, Ordering};
6971
use core::time::Duration;
7072
use core::ops::Deref;
7173

72-
#[cfg(any(test, feature = "std"))]
73-
use std::time::Instant;
74-
use util::crypto::sign;
75-
7674
// We hold various information about HTLC relay in the HTLC objects in Channel itself:
7775
//
7876
// Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should
@@ -5992,10 +5990,7 @@ where
59925990

59935991
#[cfg(any(test, feature = "_test_utils"))]
59945992
pub fn get_persistence_condvar_value(&self) -> bool {
5995-
let mutcond = &self.persistence_notifier.persistence_lock;
5996-
let &(ref mtx, _) = mutcond;
5997-
let guard = mtx.lock().unwrap();
5998-
*guard
5993+
self.persistence_notifier.needs_persist()
59995994
}
60005995

60015996
/// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@ -6237,77 +6232,6 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
62376232
}
62386233
}
62396234

6240-
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
6241-
/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
6242-
struct PersistenceNotifier {
6243-
/// Users won't access the persistence_lock directly, but rather wait on its bool using
6244-
/// `wait_timeout` and `wait`.
6245-
persistence_lock: (Mutex<bool>, Condvar),
6246-
}
6247-
6248-
impl PersistenceNotifier {
6249-
fn new() -> Self {
6250-
Self {
6251-
persistence_lock: (Mutex::new(false), Condvar::new()),
6252-
}
6253-
}
6254-
6255-
fn wait(&self) {
6256-
loop {
6257-
let &(ref mtx, ref cvar) = &self.persistence_lock;
6258-
let mut guard = mtx.lock().unwrap();
6259-
if *guard {
6260-
*guard = false;
6261-
return;
6262-
}
6263-
guard = cvar.wait(guard).unwrap();
6264-
let result = *guard;
6265-
if result {
6266-
*guard = false;
6267-
return
6268-
}
6269-
}
6270-
}
6271-
6272-
#[cfg(any(test, feature = "std"))]
6273-
fn wait_timeout(&self, max_wait: Duration) -> bool {
6274-
let current_time = Instant::now();
6275-
loop {
6276-
let &(ref mtx, ref cvar) = &self.persistence_lock;
6277-
let mut guard = mtx.lock().unwrap();
6278-
if *guard {
6279-
*guard = false;
6280-
return true;
6281-
}
6282-
guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
6283-
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
6284-
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
6285-
// time. Note that this logic can be highly simplified through the use of
6286-
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
6287-
// 1.42.0.
6288-
let elapsed = current_time.elapsed();
6289-
let result = *guard;
6290-
if result || elapsed >= max_wait {
6291-
*guard = false;
6292-
return result;
6293-
}
6294-
match max_wait.checked_sub(elapsed) {
6295-
None => return result,
6296-
Some(_) => continue
6297-
}
6298-
}
6299-
}
6300-
6301-
// Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
6302-
fn notify(&self) {
6303-
let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
6304-
let mut persistence_lock = persist_mtx.lock().unwrap();
6305-
*persistence_lock = true;
6306-
mem::drop(persistence_lock);
6307-
cnd.notify_all();
6308-
}
6309-
}
6310-
63116235
const SERIALIZATION_VERSION: u8 = 1;
63126236
const MIN_SERIALIZATION_VERSION: u8 = 1;
63136237

@@ -7355,54 +7279,6 @@ mod tests {
73557279
use util::test_utils;
73567280
use chain::keysinterface::KeysInterface;
73577281

7358-
#[cfg(feature = "std")]
7359-
#[test]
7360-
fn test_wait_timeout() {
7361-
use ln::channelmanager::PersistenceNotifier;
7362-
use sync::Arc;
7363-
use core::sync::atomic::AtomicBool;
7364-
use std::thread;
7365-
7366-
let persistence_notifier = Arc::new(PersistenceNotifier::new());
7367-
let thread_notifier = Arc::clone(&persistence_notifier);
7368-
7369-
let exit_thread = Arc::new(AtomicBool::new(false));
7370-
let exit_thread_clone = exit_thread.clone();
7371-
thread::spawn(move || {
7372-
loop {
7373-
let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
7374-
let mut persistence_lock = persist_mtx.lock().unwrap();
7375-
*persistence_lock = true;
7376-
cnd.notify_all();
7377-
7378-
if exit_thread_clone.load(Ordering::SeqCst) {
7379-
break
7380-
}
7381-
}
7382-
});
7383-
7384-
// Check that we can block indefinitely until updates are available.
7385-
let _ = persistence_notifier.wait();
7386-
7387-
// Check that the PersistenceNotifier will return after the given duration if updates are
7388-
// available.
7389-
loop {
7390-
if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
7391-
break
7392-
}
7393-
}
7394-
7395-
exit_thread.store(true, Ordering::SeqCst);
7396-
7397-
// Check that the PersistenceNotifier will return after the given duration even if no updates
7398-
// are available.
7399-
loop {
7400-
if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
7401-
break
7402-
}
7403-
}
7404-
}
7405-
74067282
#[test]
74077283
fn test_notify_limits() {
74087284
// Check that a few cases which don't require the persistence of a new ChannelManager,

lightning/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod ser;
2121
pub mod message_signing;
2222
pub mod invoice;
2323
pub mod persist;
24+
pub mod wakers;
2425

2526
pub(crate) mod atomic_counter;
2627
pub(crate) mod byte_utils;

lightning/src/util/wakers.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
//! Utilities which allow users to block on some future notification from LDK. These are
11+
//! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
12+
//! be re-persisted.
13+
//!
14+
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15+
16+
use core::mem;
17+
use core::time::Duration;
18+
use sync::{Condvar, Mutex};
19+
20+
#[cfg(any(test, feature = "std"))]
21+
use std::time::Instant;
22+
23+
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
24+
/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
25+
pub(crate) struct PersistenceNotifier {
26+
/// Users won't access the persistence_lock directly, but rather wait on its bool using
27+
/// `wait_timeout` and `wait`.
28+
persistence_lock: (Mutex<bool>, Condvar),
29+
}
30+
31+
impl PersistenceNotifier {
32+
pub(crate) fn new() -> Self {
33+
Self {
34+
persistence_lock: (Mutex::new(false), Condvar::new()),
35+
}
36+
}
37+
38+
pub(crate) fn wait(&self) {
39+
loop {
40+
let &(ref mtx, ref cvar) = &self.persistence_lock;
41+
let mut guard = mtx.lock().unwrap();
42+
if *guard {
43+
*guard = false;
44+
return;
45+
}
46+
guard = cvar.wait(guard).unwrap();
47+
let result = *guard;
48+
if result {
49+
*guard = false;
50+
return
51+
}
52+
}
53+
}
54+
55+
#[cfg(any(test, feature = "std"))]
56+
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
57+
let current_time = Instant::now();
58+
loop {
59+
let &(ref mtx, ref cvar) = &self.persistence_lock;
60+
let mut guard = mtx.lock().unwrap();
61+
if *guard {
62+
*guard = false;
63+
return true;
64+
}
65+
guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
66+
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
67+
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
68+
// time. Note that this logic can be highly simplified through the use of
69+
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
70+
// 1.42.0.
71+
let elapsed = current_time.elapsed();
72+
let result = *guard;
73+
if result || elapsed >= max_wait {
74+
*guard = false;
75+
return result;
76+
}
77+
match max_wait.checked_sub(elapsed) {
78+
None => return result,
79+
Some(_) => continue
80+
}
81+
}
82+
}
83+
84+
/// Wake waiters, tracking that persistence needs to occur.
85+
pub(crate) fn notify(&self) {
86+
let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
87+
let mut persistence_lock = persist_mtx.lock().unwrap();
88+
*persistence_lock = true;
89+
mem::drop(persistence_lock);
90+
cnd.notify_all();
91+
}
92+
93+
#[cfg(any(test, feature = "_test_utils"))]
94+
pub fn needs_persist(&self) -> bool {
95+
let &(ref mtx, _) = &self.persistence_lock;
96+
let guard = mtx.lock().unwrap();
97+
*guard
98+
}
99+
}
100+
101+
#[cfg(test)]
102+
mod tests {
103+
#[cfg(feature = "std")]
104+
#[test]
105+
fn test_wait_timeout() {
106+
use super::*;
107+
use sync::Arc;
108+
use core::sync::atomic::{AtomicBool, Ordering};
109+
use std::thread;
110+
111+
let persistence_notifier = Arc::new(PersistenceNotifier::new());
112+
let thread_notifier = Arc::clone(&persistence_notifier);
113+
114+
let exit_thread = Arc::new(AtomicBool::new(false));
115+
let exit_thread_clone = exit_thread.clone();
116+
thread::spawn(move || {
117+
loop {
118+
let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
119+
let mut persistence_lock = persist_mtx.lock().unwrap();
120+
*persistence_lock = true;
121+
cnd.notify_all();
122+
123+
if exit_thread_clone.load(Ordering::SeqCst) {
124+
break
125+
}
126+
}
127+
});
128+
129+
// Check that we can block indefinitely until updates are available.
130+
let _ = persistence_notifier.wait();
131+
132+
// Check that the PersistenceNotifier will return after the given duration if updates are
133+
// available.
134+
loop {
135+
if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
136+
break
137+
}
138+
}
139+
140+
exit_thread.store(true, Ordering::SeqCst);
141+
142+
// Check that the PersistenceNotifier will return after the given duration even if no updates
143+
// are available.
144+
loop {
145+
if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
146+
break
147+
}
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)