@@ -58,7 +58,7 @@ use util::errors::APIError;
58
58
use std:: { cmp, mem} ;
59
59
use std:: collections:: { HashMap , hash_map, HashSet } ;
60
60
use std:: io:: { Cursor , Read } ;
61
- use std:: sync:: { Arc , Mutex , MutexGuard , RwLock } ;
61
+ use std:: sync:: { Arc , Mutex , MutexGuard , RwLock , Condvar } ;
62
62
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
63
63
use std:: time:: Duration ;
64
64
use std:: marker:: { Sync , Send } ;
@@ -439,6 +439,14 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
439
439
/// Taken first everywhere where we are making changes before any other locks.
440
440
total_consistency_lock : RwLock < ( ) > ,
441
441
442
+ /// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
443
+ /// disk/backups. Users won't access the persistence_lock directly, but rather wait on its bool
444
+ /// using block_until_needs_persist.
445
+ #[ cfg( any( test, feature = "_test_utils" ) ) ]
446
+ pub persistence_lock : Arc < ( Mutex < bool > , Condvar ) > ,
447
+ #[ cfg( not( any( test, feature = "_test_utils" ) ) ) ]
448
+ persistence_lock : Arc < ( Mutex < bool > , Condvar ) > ,
449
+
442
450
keys_manager : K ,
443
451
444
452
logger : L ,
@@ -760,12 +768,36 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
760
768
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
761
769
total_consistency_lock : RwLock :: new ( ( ) ) ,
762
770
771
+ persistence_lock : Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ,
772
+
763
773
keys_manager,
764
774
765
775
logger,
766
776
}
767
777
}
768
778
779
+ /// Used to signal to users when the ChannelManager needs to be persisted to disk. This function
780
+ /// blocks either for the given Duration or indefinitely until the ChannelManager has updates and
781
+ /// needs persistence. It returns a bool indicating whether persistence is necessary (which will
782
+ /// always be true if max_wait is None).
783
+ pub fn block_until_needs_persist ( & self , max_wait : Option < Duration > ) -> bool {
784
+ loop {
785
+ let mutcond = Arc :: clone ( & self . persistence_lock ) ;
786
+ let & ( ref mtx, ref cvar) = & * mutcond;
787
+ let mut guard = mtx. lock ( ) . unwrap ( ) ;
788
+ let result = match max_wait {
789
+ Some ( wait) => cvar. wait_timeout ( guard, wait) . unwrap ( ) . 0 ,
790
+ None => cvar. wait ( guard) . unwrap ( ) ,
791
+ } ;
792
+ guard = result;
793
+ let saved_res = * guard;
794
+ * guard = false ;
795
+ if saved_res || max_wait. is_some ( ) {
796
+ return saved_res;
797
+ }
798
+ }
799
+ }
800
+
769
801
/// Creates a new outbound channel to the given remote node and with the given value.
770
802
///
771
803
/// user_id will be provided back as user_channel_id in FundingGenerationReady and
@@ -913,6 +945,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
913
945
// the latest local state, which is the best we can do anyway. Thus, it is safe to
914
946
// ignore the result here.
915
947
let _ = self . chain_monitor . update_channel ( funding_txo, monitor_update) ;
948
+ self . persist_updates ( ) ;
916
949
}
917
950
}
918
951
@@ -1313,6 +1346,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
1313
1346
commitment_signed,
1314
1347
} ,
1315
1348
} ) ;
1349
+ self . persist_updates ( ) ;
1316
1350
} ,
1317
1351
None => { } ,
1318
1352
}
@@ -1707,6 +1741,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
1707
1741
commitment_signed : commitment_msg,
1708
1742
} ,
1709
1743
} ) ;
1744
+ self . persist_updates ( ) ;
1710
1745
}
1711
1746
} else {
1712
1747
unreachable ! ( ) ;
@@ -2126,6 +2161,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2126
2161
}
2127
2162
} ) ;
2128
2163
}
2164
+ self . persist_updates ( ) ;
2129
2165
return Ok ( ( ) )
2130
2166
} ,
2131
2167
Err ( e) => {
@@ -2340,6 +2376,14 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2340
2376
Ok ( ( ) )
2341
2377
}
2342
2378
2379
+ // Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
2380
+ fn persist_updates ( & self ) {
2381
+ let & ( ref persist_mtx, ref cnd) = & * self . persistence_lock ;
2382
+ let mut persistence_lock = persist_mtx. lock ( ) . unwrap ( ) ;
2383
+ * persistence_lock = true ;
2384
+ cnd. notify_all ( ) ;
2385
+ }
2386
+
2343
2387
fn internal_funding_created ( & self , counterparty_node_id : & PublicKey , msg : & msgs:: FundingCreated ) -> Result < ( ) , MsgHandleErrInternal > {
2344
2388
let ( ( funding_msg, monitor) , mut chan) = {
2345
2389
let mut channel_lock = self . channel_state . lock ( ) . unwrap ( ) ;
@@ -2379,6 +2423,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2379
2423
} ,
2380
2424
}
2381
2425
}
2426
+
2427
+ self . persist_updates ( ) ;
2428
+
2382
2429
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
2383
2430
let channel_state = & mut * channel_state_lock;
2384
2431
match channel_state. by_id . entry ( funding_msg. channel_id ) {
@@ -2417,6 +2464,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2417
2464
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( "Failed to find corresponding channel" . to_owned ( ) , msg. channel_id ) )
2418
2465
}
2419
2466
} ;
2467
+
2468
+ self . persist_updates ( ) ;
2469
+
2420
2470
let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
2421
2471
pending_events. push ( events:: Event :: FundingBroadcastSafe {
2422
2472
funding_txo,
@@ -2712,6 +2762,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2712
2762
msg,
2713
2763
} ) ;
2714
2764
}
2765
+ self . persist_updates ( ) ;
2715
2766
Ok ( ( ) )
2716
2767
} ,
2717
2768
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( "Failed to find corresponding channel" . to_owned ( ) , msg. channel_id ) )
@@ -2803,9 +2854,13 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2803
2854
self . fail_htlc_backwards_internal ( self . channel_state . lock ( ) . unwrap ( ) , failure. 0 , & failure. 1 , failure. 2 ) ;
2804
2855
}
2805
2856
self . forward_htlcs ( & mut [ ( short_channel_id, channel_outpoint, pending_forwards) ] ) ;
2857
+ self . persist_updates ( ) ;
2806
2858
Ok ( ( ) )
2807
2859
} ,
2808
- Err ( e) => Err ( e)
2860
+ Err ( e) => {
2861
+ self . persist_updates ( ) ;
2862
+ Err ( e)
2863
+ }
2809
2864
}
2810
2865
}
2811
2866
@@ -2946,6 +3001,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2946
3001
msg,
2947
3002
} ) ;
2948
3003
}
3004
+ self . persist_updates ( ) ;
2949
3005
Ok ( ( ) )
2950
3006
} ,
2951
3007
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( "Failed to find corresponding channel" . to_owned ( ) , msg. channel_id ) )
@@ -2995,6 +3051,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
2995
3051
commitment_signed,
2996
3052
} ,
2997
3053
} ) ;
3054
+ self . persist_updates ( ) ;
2998
3055
}
2999
3056
} ,
3000
3057
}
@@ -3994,6 +4051,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
3994
4051
3995
4052
pending_events : Mutex :: new ( pending_events_read) ,
3996
4053
total_consistency_lock : RwLock :: new ( ( ) ) ,
4054
+ persistence_lock : Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ,
3997
4055
keys_manager : args. keys_manager ,
3998
4056
logger : args. logger ,
3999
4057
default_configuration : args. default_config ,
0 commit comments