@@ -346,9 +346,10 @@ where
346
346
///
347
347
/// # Pruning stale channel updates
348
348
///
349
- /// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if
350
- /// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy`
351
- /// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
349
+ /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
350
+ /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
351
+ /// are deleted.
352
+ /// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
352
353
/// will complete. However, stale updates are not a problem for data integrity, since updates are
353
354
/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
354
355
///
@@ -610,24 +611,6 @@ where
610
611
) -> chain:: ChannelMonitorUpdateStatus {
611
612
// Determine the proper key for this monitor
612
613
let monitor_name = MonitorName :: from ( funding_txo) ;
613
- let maybe_old_monitor = self . read_monitor ( & monitor_name) ;
614
- match maybe_old_monitor {
615
- Ok ( ( _, ref old_monitor) ) => {
616
- // Check that this key isn't already storing a monitor with a higher update_id
617
- // (collision)
618
- if old_monitor. get_latest_update_id ( ) > monitor. get_latest_update_id ( ) {
619
- log_error ! (
620
- self . logger,
621
- "Tried to write a monitor at the same outpoint {} with a higher update_id!" ,
622
- monitor_name. as_str( )
623
- ) ;
624
- return chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ;
625
- }
626
- }
627
- // This means the channel monitor is new.
628
- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: NotFound => { }
629
- _ => return chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ,
630
- }
631
614
// Serialize and write the new monitor
632
615
let mut monitor_bytes = Vec :: with_capacity (
633
616
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( ) + monitor. serialized_length ( ) ,
@@ -641,65 +624,12 @@ where
641
624
& monitor_bytes,
642
625
) {
643
626
Ok ( _) => {
644
- // Assess cleanup. Typically, we'll clean up only between the last two known full
645
- // monitors.
646
- if let Ok ( ( _, old_monitor) ) = maybe_old_monitor {
647
- let start = old_monitor. get_latest_update_id ( ) ;
648
- let end = if monitor. get_latest_update_id ( ) == CLOSED_CHANNEL_UPDATE_ID {
649
- // We don't want to clean the rest of u64, so just do possible pending
650
- // updates. Note that we never write updates at
651
- // `CLOSED_CHANNEL_UPDATE_ID`.
652
- cmp:: min (
653
- start. saturating_add ( self . maximum_pending_updates ) ,
654
- CLOSED_CHANNEL_UPDATE_ID - 1 ,
655
- )
656
- } else {
657
- monitor. get_latest_update_id ( ) . saturating_sub ( 1 )
658
- } ;
659
- // We should bother cleaning up only if there's at least one update
660
- // expected.
661
- for update_id in start..=end {
662
- let update_name = UpdateName :: from ( update_id) ;
663
- #[ cfg( debug_assertions) ]
664
- {
665
- if let Ok ( update) =
666
- self . read_monitor_update ( & monitor_name, & update_name)
667
- {
668
- // Assert that we are reading what we think we are.
669
- debug_assert_eq ! ( update. update_id, update_name. 0 ) ;
670
- } else if update_id != start && monitor. get_latest_update_id ( ) != CLOSED_CHANNEL_UPDATE_ID
671
- {
672
- // We're deleting something we should know doesn't exist.
673
- panic ! (
674
- "failed to read monitor update {}" ,
675
- update_name. as_str( )
676
- ) ;
677
- }
678
- // On closed channels, we will unavoidably try to read
679
- // non-existent updates since we have to guess at the range of
680
- // stale updates, so do nothing.
681
- }
682
- if let Err ( e) = self . kv_store . remove (
683
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
684
- monitor_name. as_str ( ) ,
685
- update_name. as_str ( ) ,
686
- true ,
687
- ) {
688
- log_error ! (
689
- self . logger,
690
- "error cleaning up channel monitor updates for monitor {}, reason: {}" ,
691
- monitor_name. as_str( ) ,
692
- e
693
- ) ;
694
- } ;
695
- }
696
- } ;
697
627
chain:: ChannelMonitorUpdateStatus :: Completed
698
628
}
699
629
Err ( e) => {
700
630
log_error ! (
701
631
self . logger,
702
- "error writing channel monitor {}/{}/{} reason: {}" ,
632
+ "Failed to write ChannelMonitor {}/{}/{} reason: {}" ,
703
633
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
704
634
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
705
635
monitor_name. as_str( ) ,
@@ -741,7 +671,7 @@ where
741
671
Err ( e) => {
742
672
log_error ! (
743
673
self . logger,
744
- "error writing channel monitor update {}/{}/{} reason: {}" ,
674
+ "Failed to write ChannelMonitorUpdate {}/{}/{} reason: {}" ,
745
675
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
746
676
monitor_name. as_str( ) ,
747
677
update_name. as_str( ) ,
@@ -751,8 +681,41 @@ where
751
681
}
752
682
}
753
683
} else {
754
- // We could write this update, but it meets criteria of our design that call for a full monitor write.
755
- self . persist_new_channel ( funding_txo, monitor, monitor_update_call_id)
684
+ let monitor_name = MonitorName :: from ( funding_txo) ;
685
+ // In case of channel-close monitor update, we need to read old monitor before persisting
686
+ // the new one in order to determine the cleanup range.
687
+ let maybe_old_monitor = match monitor. get_latest_update_id ( ) {
688
+ CLOSED_CHANNEL_UPDATE_ID => self . read_monitor ( & monitor_name) . ok ( ) ,
689
+ _ => None
690
+ } ;
691
+
692
+ // We could write this update, but it meets criteria of our design that calls for a full monitor write.
693
+ let monitor_update_status = self . persist_new_channel ( funding_txo, monitor, monitor_update_call_id) ;
694
+
695
+ if let chain:: ChannelMonitorUpdateStatus :: Completed = monitor_update_status {
696
+ let cleanup_range = if monitor. get_latest_update_id ( ) == CLOSED_CHANNEL_UPDATE_ID {
697
+ // If there is an error while reading old monitor, we skip clean up.
698
+ maybe_old_monitor. map ( |( _, ref old_monitor) | {
699
+ let start = old_monitor. get_latest_update_id ( ) ;
700
+ // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
701
+ let end = cmp:: min (
702
+ start. saturating_add ( self . maximum_pending_updates ) ,
703
+ CLOSED_CHANNEL_UPDATE_ID - 1 ,
704
+ ) ;
705
+ ( start, end)
706
+ } )
707
+ } else {
708
+ let end = monitor. get_latest_update_id ( ) ;
709
+ let start = end. saturating_sub ( self . maximum_pending_updates ) ;
710
+ Some ( ( start, end) )
711
+ } ;
712
+
713
+ if let Some ( ( start, end) ) = cleanup_range {
714
+ self . cleanup_in_range ( monitor_name, start, end) ;
715
+ }
716
+ }
717
+
718
+ monitor_update_status
756
719
}
757
720
} else {
758
721
// There is no update given, so we must persist a new monitor.
@@ -761,6 +724,34 @@ where
761
724
}
762
725
}
763
726
727
+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref > MonitorUpdatingPersister < K , L , ES , SP >
728
+ where
729
+ ES :: Target : EntropySource + Sized ,
730
+ K :: Target : KVStore ,
731
+ L :: Target : Logger ,
732
+ SP :: Target : SignerProvider + Sized
733
+ {
734
+ // Cleans up monitor updates for given monitor in range `start..=end`.
735
+ fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
736
+ for update_id in start..=end {
737
+ let update_name = UpdateName :: from ( update_id) ;
738
+ if let Err ( e) = self . kv_store . remove (
739
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
740
+ monitor_name. as_str ( ) ,
741
+ update_name. as_str ( ) ,
742
+ true ,
743
+ ) {
744
+ log_error ! (
745
+ self . logger,
746
+ "Failed to clean up channel monitor updates for monitor {}, reason: {}" ,
747
+ monitor_name. as_str( ) ,
748
+ e
749
+ ) ;
750
+ } ;
751
+ }
752
+ }
753
+ }
754
+
764
755
/// A struct representing a name for a monitor.
765
756
#[ derive( Debug ) ]
766
757
struct MonitorName ( String ) ;
@@ -896,20 +887,21 @@ mod tests {
896
887
#[ test]
897
888
fn persister_with_real_monitors ( ) {
898
889
// This value is used later to limit how many iterations we perform.
899
- let test_max_pending_updates = 7 ;
890
+ let persister_0_max_pending_updates = 7 ;
891
+ // Intentionally set this to a smaller value to test a different alignment.
892
+ let persister_1_max_pending_updates = 3 ;
900
893
let chanmon_cfgs = create_chanmon_cfgs ( 4 ) ;
901
894
let persister_0 = MonitorUpdatingPersister {
902
895
kv_store : & TestStore :: new ( false ) ,
903
896
logger : & TestLogger :: new ( ) ,
904
- maximum_pending_updates : test_max_pending_updates ,
897
+ maximum_pending_updates : persister_0_max_pending_updates ,
905
898
entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
906
899
signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
907
900
} ;
908
901
let persister_1 = MonitorUpdatingPersister {
909
902
kv_store : & TestStore :: new ( false ) ,
910
903
logger : & TestLogger :: new ( ) ,
911
- // Intentionally set this to a smaller value to test a different alignment.
912
- maximum_pending_updates : 3 ,
904
+ maximum_pending_updates : persister_1_max_pending_updates,
913
905
entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
914
906
signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
915
907
} ;
@@ -934,7 +926,6 @@ mod tests {
934
926
node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
935
927
let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
936
928
let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
937
-
938
929
let broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ;
939
930
let broadcaster_1 = & chanmon_cfgs[ 3 ] . tx_broadcaster ;
940
931
@@ -957,10 +948,11 @@ mod tests {
957
948
for ( _, mon) in persisted_chan_data_0. iter( ) {
958
949
// check that when we read it, we got the right update id
959
950
assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
960
- // if the CM is at the correct update id without updates, ensure no updates are stored
951
+
952
+ // if the CM is at consolidation threshold, ensure no updates are stored.
961
953
let monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ;
962
- let ( _ , cm_0 ) = persister_0 . read_monitor ( & monitor_name ) . unwrap ( ) ;
963
- if cm_0 . get_latest_update_id( ) == $expected_update_id {
954
+ if mon . get_latest_update_id ( ) % persister_0_max_pending_updates == 0
955
+ || mon . get_latest_update_id( ) == CLOSED_CHANNEL_UPDATE_ID {
964
956
assert_eq!(
965
957
persister_0. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
966
958
monitor_name. as_str( ) ) . unwrap( ) . len( ) ,
@@ -975,8 +967,9 @@ mod tests {
975
967
for ( _, mon) in persisted_chan_data_1. iter( ) {
976
968
assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
977
969
let monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ;
978
- let ( _, cm_1) = persister_1. read_monitor( & monitor_name) . unwrap( ) ;
979
- if cm_1. get_latest_update_id( ) == $expected_update_id {
970
+ // if the CM is at consolidation threshold, ensure no updates are stored.
971
+ if mon. get_latest_update_id( ) % persister_1_max_pending_updates == 0
972
+ || mon. get_latest_update_id( ) == CLOSED_CHANNEL_UPDATE_ID {
980
973
assert_eq!(
981
974
persister_1. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
982
975
monitor_name. as_str( ) ) . unwrap( ) . len( ) ,
@@ -1001,7 +994,7 @@ mod tests {
1001
994
// Send a few more payments to try all the alignments of max pending updates with
1002
995
// updates for a payment sent and received.
1003
996
let mut sender = 0 ;
1004
- for i in 3 ..=test_max_pending_updates * 2 {
997
+ for i in 3 ..=persister_0_max_pending_updates * 2 {
1005
998
let receiver;
1006
999
if sender == 0 {
1007
1000
sender = 1 ;
0 commit comments