@@ -113,7 +113,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
113
113
const NETWORK_PRUNE_TIMER : u64 = 60 * 60 ;
114
114
115
115
#[ cfg( not( test) ) ]
116
- const SCORER_PERSIST_TIMER : u64 = 60 * 60 ;
116
+ const SCORER_PERSIST_TIMER : u64 = 60 * 5 ;
117
117
#[ cfg( test) ]
118
118
const SCORER_PERSIST_TIMER : u64 = 1 ;
119
119
@@ -244,30 +244,30 @@ fn handle_network_graph_update<L: Deref>(
244
244
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
245
245
/// to persist.
246
246
fn update_scorer < ' a , S : ' static + Deref < Target = SC > + Send + Sync , SC : ' a + WriteableScore < ' a > > (
247
- scorer : & ' a S , event : & Event
247
+ scorer : & ' a S , event : & Event , duration_since_epoch : Duration ,
248
248
) -> bool {
249
249
match event {
250
250
Event :: PaymentPathFailed { ref path, short_channel_id : Some ( scid) , .. } => {
251
251
let mut score = scorer. write_lock ( ) ;
252
- score. payment_path_failed ( path, * scid) ;
252
+ score. payment_path_failed ( path, * scid, duration_since_epoch ) ;
253
253
} ,
254
254
Event :: PaymentPathFailed { ref path, payment_failed_permanently : true , .. } => {
255
255
// Reached if the destination explicitly failed it back. We treat this as a successful probe
256
256
// because the payment made it all the way to the destination with sufficient liquidity.
257
257
let mut score = scorer. write_lock ( ) ;
258
- score. probe_successful ( path) ;
258
+ score. probe_successful ( path, duration_since_epoch ) ;
259
259
} ,
260
260
Event :: PaymentPathSuccessful { path, .. } => {
261
261
let mut score = scorer. write_lock ( ) ;
262
- score. payment_path_successful ( path) ;
262
+ score. payment_path_successful ( path, duration_since_epoch ) ;
263
263
} ,
264
264
Event :: ProbeSuccessful { path, .. } => {
265
265
let mut score = scorer. write_lock ( ) ;
266
- score. probe_successful ( path) ;
266
+ score. probe_successful ( path, duration_since_epoch ) ;
267
267
} ,
268
268
Event :: ProbeFailed { path, short_channel_id : Some ( scid) , .. } => {
269
269
let mut score = scorer. write_lock ( ) ;
270
- score. probe_failed ( path, * scid) ;
270
+ score. probe_failed ( path, * scid, duration_since_epoch ) ;
271
271
} ,
272
272
_ => return false ,
273
273
}
@@ -280,7 +280,7 @@ macro_rules! define_run_body {
280
280
$channel_manager: ident, $process_channel_manager_events: expr,
281
281
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
282
282
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
283
- $timer_elapsed: expr, $check_slow_await: expr
283
+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch : expr ,
284
284
) => { {
285
285
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
286
286
$channel_manager. timer_tick_occurred( ) ;
@@ -294,6 +294,7 @@ macro_rules! define_run_body {
294
294
let mut last_scorer_persist_call = $get_timer( SCORER_PERSIST_TIMER ) ;
295
295
let mut last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
296
296
let mut have_pruned = false ;
297
+ let mut have_decayed_scorer = false ;
297
298
298
299
loop {
299
300
$process_channel_manager_events;
@@ -383,11 +384,10 @@ macro_rules! define_run_body {
383
384
if should_prune {
384
385
// The network graph must not be pruned while rapid sync completion is pending
385
386
if let Some ( network_graph) = $gossip_sync. prunable_network_graph( ) {
386
- # [ cfg ( feature = "std" ) ] {
387
+ if let Some ( duration_since_epoch ) = $time_fetch ( ) {
387
388
log_trace!( $logger, "Pruning and persisting network graph." ) ;
388
- network_graph. remove_stale_channels_and_tracking( ) ;
389
- }
390
- #[ cfg( not( feature = "std" ) ) ] {
389
+ network_graph. remove_stale_channels_and_tracking_with_time( duration_since_epoch. as_secs( ) ) ;
390
+ } else {
391
391
log_warn!( $logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time." ) ;
392
392
log_trace!( $logger, "Persisting network graph." ) ;
393
393
}
@@ -402,9 +402,24 @@ macro_rules! define_run_body {
402
402
last_prune_call = $get_timer( prune_timer) ;
403
403
}
404
404
405
+ if !have_decayed_scorer {
406
+ if let Some ( ref scorer) = $scorer {
407
+ if let Some ( duration_since_epoch) = $time_fetch( ) {
408
+ log_trace!( $logger, "Calling time_passed on scorer at startup" ) ;
409
+ scorer. write_lock( ) . time_passed( duration_since_epoch) ;
410
+ }
411
+ }
412
+ have_decayed_scorer = true ;
413
+ }
414
+
405
415
if $timer_elapsed( & mut last_scorer_persist_call, SCORER_PERSIST_TIMER ) {
406
416
if let Some ( ref scorer) = $scorer {
407
- log_trace!( $logger, "Persisting scorer" ) ;
417
+ if let Some ( duration_since_epoch) = $time_fetch( ) {
418
+ log_trace!( $logger, "Calling time_passed and persisting scorer" ) ;
419
+ scorer. write_lock( ) . time_passed( duration_since_epoch) ;
420
+ } else {
421
+ log_trace!( $logger, "Persisting scorer" ) ;
422
+ }
408
423
if let Err ( e) = $persister. persist_scorer( & scorer) {
409
424
log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
410
425
}
@@ -510,12 +525,16 @@ use core::task;
510
525
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
511
526
/// are hundreds or thousands of simultaneous process calls running.
512
527
///
528
+ /// The `fetch_time` parameter should return the current wall clock time, if one is available. If
529
+ /// no time is available, some features may be disabled, however the node will still operate fine.
530
+ ///
513
531
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
514
532
/// could setup `process_events_async` like this:
515
533
/// ```
516
534
/// # use lightning::io;
517
535
/// # use std::sync::{Arc, RwLock};
518
536
/// # use std::sync::atomic::{AtomicBool, Ordering};
537
+ /// # use std::time::SystemTime;
519
538
/// # use lightning_background_processor::{process_events_async, GossipSync};
520
539
/// # struct MyStore {}
521
540
/// # impl lightning::util::persist::KVStore for MyStore {
@@ -584,6 +603,7 @@ use core::task;
584
603
/// Some(background_scorer),
585
604
/// sleeper,
586
605
/// mobile_interruptable_platform,
606
+ /// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
587
607
/// )
588
608
/// .await
589
609
/// .expect("Failed to process events");
@@ -620,11 +640,12 @@ pub async fn process_events_async<
620
640
S : ' static + Deref < Target = SC > + Send + Sync ,
621
641
SC : for < ' b > WriteableScore < ' b > ,
622
642
SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
623
- Sleeper : Fn ( Duration ) -> SleepFuture
643
+ Sleeper : Fn ( Duration ) -> SleepFuture ,
644
+ FetchTime : Fn ( ) -> Option < Duration > ,
624
645
> (
625
646
persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
626
647
gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
627
- sleeper : Sleeper , mobile_interruptable_platform : bool ,
648
+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
628
649
) -> Result < ( ) , lightning:: io:: Error >
629
650
where
630
651
UL :: Target : ' static + UtxoLookup ,
@@ -648,15 +669,18 @@ where
648
669
let scorer = & scorer;
649
670
let logger = & logger;
650
671
let persister = & persister;
672
+ let fetch_time = & fetch_time;
651
673
async move {
652
674
if let Some ( network_graph) = network_graph {
653
675
handle_network_graph_update ( network_graph, & event)
654
676
}
655
677
if let Some ( ref scorer) = scorer {
656
- if update_scorer ( scorer, & event) {
657
- log_trace ! ( logger, "Persisting scorer after update" ) ;
658
- if let Err ( e) = persister. persist_scorer ( & scorer) {
659
- log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
678
+ if let Some ( duration_since_epoch) = fetch_time ( ) {
679
+ if update_scorer ( scorer, & event, duration_since_epoch) {
680
+ log_trace ! ( logger, "Persisting scorer after update" ) ;
681
+ if let Err ( e) = persister. persist_scorer ( & scorer) {
682
+ log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
683
+ }
660
684
}
661
685
}
662
686
}
@@ -688,7 +712,7 @@ where
688
712
task:: Poll :: Ready ( exit) => { should_break = exit; true } ,
689
713
task:: Poll :: Pending => false ,
690
714
}
691
- } , mobile_interruptable_platform
715
+ } , mobile_interruptable_platform, fetch_time ,
692
716
)
693
717
}
694
718
@@ -810,7 +834,10 @@ impl BackgroundProcessor {
810
834
handle_network_graph_update ( network_graph, & event)
811
835
}
812
836
if let Some ( ref scorer) = scorer {
813
- if update_scorer ( scorer, & event) {
837
+ use std:: time:: SystemTime ;
838
+ let duration_since_epoch = SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH )
839
+ . expect ( "Time should be sometime after 1970" ) ;
840
+ if update_scorer ( scorer, & event, duration_since_epoch) {
814
841
log_trace ! ( logger, "Persisting scorer after update" ) ;
815
842
if let Err ( e) = persister. persist_scorer ( & scorer) {
816
843
log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
@@ -829,7 +856,12 @@ impl BackgroundProcessor {
829
856
channel_manager. get_event_or_persistence_needed_future( ) ,
830
857
chain_monitor. get_update_future( )
831
858
) . wait_timeout( Duration :: from_millis( 100 ) ) ; } ,
832
- |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false
859
+ |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false ,
860
+ || {
861
+ use std:: time:: SystemTime ;
862
+ Some ( SystemTime :: now( ) . duration_since( SystemTime :: UNIX_EPOCH )
863
+ . expect( "Time should be sometime after 1970" ) )
864
+ } ,
833
865
)
834
866
} ) ;
835
867
Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -1117,7 +1149,7 @@ mod tests {
1117
1149
}
1118
1150
1119
1151
impl ScoreUpdate for TestScorer {
1120
- fn payment_path_failed ( & mut self , actual_path : & Path , actual_short_channel_id : u64 ) {
1152
+ fn payment_path_failed ( & mut self , actual_path : & Path , actual_short_channel_id : u64 , _ : Duration ) {
1121
1153
if let Some ( expectations) = & mut self . event_expectations {
1122
1154
match expectations. pop_front ( ) . unwrap ( ) {
1123
1155
TestResult :: PaymentFailure { path, short_channel_id } => {
@@ -1137,7 +1169,7 @@ mod tests {
1137
1169
}
1138
1170
}
1139
1171
1140
- fn payment_path_successful ( & mut self , actual_path : & Path ) {
1172
+ fn payment_path_successful ( & mut self , actual_path : & Path , _ : Duration ) {
1141
1173
if let Some ( expectations) = & mut self . event_expectations {
1142
1174
match expectations. pop_front ( ) . unwrap ( ) {
1143
1175
TestResult :: PaymentFailure { path, .. } => {
@@ -1156,7 +1188,7 @@ mod tests {
1156
1188
}
1157
1189
}
1158
1190
1159
- fn probe_failed ( & mut self , actual_path : & Path , _: u64 ) {
1191
+ fn probe_failed ( & mut self , actual_path : & Path , _: u64 , _ : Duration ) {
1160
1192
if let Some ( expectations) = & mut self . event_expectations {
1161
1193
match expectations. pop_front ( ) . unwrap ( ) {
1162
1194
TestResult :: PaymentFailure { path, .. } => {
@@ -1174,7 +1206,7 @@ mod tests {
1174
1206
}
1175
1207
}
1176
1208
}
1177
- fn probe_successful ( & mut self , actual_path : & Path ) {
1209
+ fn probe_successful ( & mut self , actual_path : & Path , _ : Duration ) {
1178
1210
if let Some ( expectations) = & mut self . event_expectations {
1179
1211
match expectations. pop_front ( ) . unwrap ( ) {
1180
1212
TestResult :: PaymentFailure { path, .. } => {
@@ -1192,6 +1224,7 @@ mod tests {
1192
1224
}
1193
1225
}
1194
1226
}
1227
+ fn time_passed ( & mut self , _: Duration ) { }
1195
1228
}
1196
1229
1197
1230
#[ cfg( c_bindings) ]
@@ -1469,7 +1502,7 @@ mod tests {
1469
1502
tokio:: time:: sleep ( dur) . await ;
1470
1503
false // Never exit
1471
1504
} )
1472
- } , false ,
1505
+ } , false , || Some ( Duration :: ZERO ) ,
1473
1506
) ;
1474
1507
match bp_future. await {
1475
1508
Ok ( _) => panic ! ( "Expected error persisting manager" ) ,
@@ -1600,7 +1633,7 @@ mod tests {
1600
1633
1601
1634
loop {
1602
1635
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
1603
- let expected_log = "Persisting scorer" . to_string ( ) ;
1636
+ let expected_log = "Calling time_passed and persisting scorer" . to_string ( ) ;
1604
1637
if log_entries. get ( & ( "lightning_background_processor" , expected_log) ) . is_some ( ) {
1605
1638
break
1606
1639
}
@@ -1699,7 +1732,7 @@ mod tests {
1699
1732
_ = exit_receiver. changed( ) => true ,
1700
1733
}
1701
1734
} )
1702
- } , false ,
1735
+ } , false , || Some ( Duration :: from_secs ( 1696300000 ) ) ,
1703
1736
) ;
1704
1737
1705
1738
let t1 = tokio:: spawn ( bp_future) ;
@@ -1874,7 +1907,7 @@ mod tests {
1874
1907
_ = exit_receiver. changed( ) => true ,
1875
1908
}
1876
1909
} )
1877
- } , false ,
1910
+ } , false , || Some ( Duration :: ZERO ) ,
1878
1911
) ;
1879
1912
let t1 = tokio:: spawn ( bp_future) ;
1880
1913
let t2 = tokio:: spawn ( async move {
0 commit comments