Skip to content

Commit 14e5a98

Browse files
committed
Add a RAAMonitorUpdateBlockingAction::ClaimedMPPPayment
If we claim an MPP payment and only persist some of the `ChannelMonitorUpdate`s which include the preimage prior to shutting down, we may be in a state where some of our `ChannelMonitor`s have the preimage for a payment while others do not. This, it turns out, is actually mostly safe - on startup `ChanelManager` will detect there's a payment it has as unclaimed but there's a corresponding payment preimage in a `ChannelMonitor` and go claim the other MPP parts. This works so long as the `ChannelManager` has been persisted after the payment has been received but prior to the `PaymentClaimable` event being processed (and the claim itself occurring). This is not always true and certainly not required on our API, but our `lightning-background-processor` today does persist prior to event handling so is generally true subject to some race conditions. In order to address this race we need to use copy payment preimages across channels irrespective of the `ChannelManager`'s payment state, but this introduces another wrinkle - if one channel makes substantial progress while other channel(s) are still waiting to get the payment preimage in `ChannelMonitor`(s) while the `ChannelManager` hasn't been persisted after the payment was received, we may end up without the preimage on disk. Here, we address this issue with a new `RAAMonitorUpdateBlockingAction` variant for this specific case. We block persistence of an RAA `ChannelMonitorUpdate` which may remove the preimage from disk until all channels have had the preimage added to their `ChannelMonitor`. We do this only in-memory (and not on disk) as we can recreate this blocker during the startup re-claim logic. This will enable us to claim MPP parts without using the `ChannelManager`'s payment state in later work.
1 parent 24158d5 commit 14e5a98

File tree

1 file changed

+144
-14
lines changed

1 file changed

+144
-14
lines changed

lightning/src/ln/channelmanager.rs

+144-14
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,42 @@ enum BackgroundEvent {
757757
},
758758
}
759759

760+
/// A pointer to a channel which is unblocked when an event is surfaced
761+
#[derive(Debug)]
762+
pub(crate) struct EventUnblockedChannel {
763+
counterparty_node_id: PublicKey,
764+
funding_txo: OutPoint,
765+
channel_id: ChannelId,
766+
blocking_action: RAAMonitorUpdateBlockingAction,
767+
}
768+
769+
impl Writeable for EventUnblockedChannel {
770+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
771+
self.counterparty_node_id.write(writer)?;
772+
self.funding_txo.write(writer)?;
773+
self.channel_id.write(writer)?;
774+
self.blocking_action.write(writer)
775+
}
776+
}
777+
778+
impl MaybeReadable for EventUnblockedChannel {
779+
fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
780+
let counterparty_node_id = Readable::read(reader)?;
781+
let funding_txo = Readable::read(reader)?;
782+
let channel_id = Readable::read(reader)?;
783+
let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
784+
Some(blocking_action) => blocking_action,
785+
None => return Ok(None),
786+
};
787+
Ok(Some(EventUnblockedChannel {
788+
counterparty_node_id,
789+
funding_txo,
790+
channel_id,
791+
blocking_action,
792+
}))
793+
}
794+
}
795+
760796
#[derive(Debug)]
761797
pub(crate) enum MonitorUpdateCompletionAction {
762798
/// Indicates that a payment ultimately destined for us was claimed and we should emit an
@@ -774,7 +810,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
774810
/// outbound edge.
775811
EmitEventAndFreeOtherChannel {
776812
event: events::Event,
777-
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
813+
downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
778814
},
779815
/// Indicates we should immediately resume the operation of another channel, unless there is
780816
/// some other reason why the channel is blocked. In practice this simply means immediately
@@ -803,7 +839,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
803839
(1, FreeOtherChannelImmediately) => {
804840
(0, downstream_counterparty_node_id, required),
805841
(2, downstream_funding_outpoint, required),
806-
(4, blocking_action, required),
842+
(4, blocking_action, upgradable_required),
807843
// Note that by the time we get past the required read above, downstream_funding_outpoint will be
808844
// filled in, so we can safely unwrap it here.
809845
(5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -815,7 +851,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
815851
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
816852
// support async monitor updates even in LDK 0.0.116 and once we do we'll require no
817853
// downgrades to prior versions.
818-
(1, downstream_counterparty_and_funding_outpoint, option),
854+
(1, downstream_counterparty_and_funding_outpoint, upgradable_option),
819855
},
820856
);
821857

@@ -837,6 +873,30 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
837873
};
838874
);
839875

876+
#[derive(Debug)]
877+
pub(crate) struct PendingMPPClaim {
878+
channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
879+
channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
880+
}
881+
882+
#[derive(Clone)]
883+
pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
884+
885+
impl PartialEq for PendingMPPClaimPointer {
886+
fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
887+
}
888+
impl Eq for PendingMPPClaimPointer {}
889+
890+
impl core::fmt::Debug for PendingMPPClaimPointer {
891+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
892+
let state = self.0.lock().unwrap();
893+
f.debug_struct("PendingMPPClaimPointer")
894+
.field("channels_without_preimage", &state.channels_without_preimage)
895+
.field("channels_with_preimage", &state.channels_with_preimage)
896+
.finish()
897+
}
898+
}
899+
840900
#[derive(Clone, PartialEq, Eq, Debug)]
841901
/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
842902
/// the blocked action here. See enum variants for more info.
@@ -850,6 +910,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
850910
/// The HTLC ID on the inbound edge.
851911
htlc_id: u64,
852912
},
913+
/// We claimed an MPP payment across multiple channels. We have to block removing the payment
914+
/// preimage from any monitor until the last monitor is updated to contain the payment
915+
/// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) which
916+
/// weren't updated on startup.
917+
///
918+
/// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
919+
/// state.
920+
ClaimedMPPPayment {
921+
pending_claim: PendingMPPClaimPointer,
922+
}
853923
}
854924

855925
impl RAAMonitorUpdateBlockingAction {
@@ -861,10 +931,57 @@ impl RAAMonitorUpdateBlockingAction {
861931
}
862932
}
863933

864-
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
865-
(0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
866-
;);
934+
impl Writeable for RAAMonitorUpdateBlockingAction {
935+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
936+
match self {
937+
RAAMonitorUpdateBlockingAction::ForwardedPaymentInboundClaim { channel_id, htlc_id } => {
938+
0u8.write(writer)?;
939+
write_tlv_fields!(writer, {
940+
(0, channel_id, required),
941+
(2, htlc_id, required),
942+
});
943+
},
944+
RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { .. } => {
945+
1u8.write(writer)?;
946+
write_tlv_fields!(writer, {});
947+
// This is rebuilt on restart, so we don't bother writing it.
948+
},
949+
}
950+
Ok(())
951+
}
952+
}
867953

954+
impl Readable for Option<RAAMonitorUpdateBlockingAction> {
955+
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
956+
Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
957+
}
958+
}
959+
960+
impl MaybeReadable for RAAMonitorUpdateBlockingAction {
961+
fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
962+
match <u8 as Readable>::read(reader)? {
963+
0 => {
964+
_init_and_read_len_prefixed_tlv_fields!(reader, {
965+
(0, channel_id, required),
966+
(2, htlc_id, required),
967+
});
968+
Ok(Some(RAAMonitorUpdateBlockingAction::ForwardedPaymentInboundClaim {
969+
channel_id: channel_id.0.unwrap(),
970+
htlc_id: htlc_id.0.unwrap(),
971+
}))
972+
},
973+
// 1 is ClaimedMPPPayment and is handled in the general odd handling below
974+
x if x % 2 == 1 => {
975+
// Discard the contents
976+
let tlv_len: BigSize = Readable::read(reader)?;
977+
FixedLengthReader::new(reader, tlv_len.0)
978+
.eat_remaining().map_err(|_| DecodeError::ShortRead)?;
979+
Ok(None)
980+
},
981+
_ => Err(DecodeError::InvalidValue),
982+
}
983+
}
984+
}
868985

869986
/// State we hold per-peer.
870987
pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -6369,7 +6486,12 @@ where
63696486
|htlc_claim_value_msat, definitely_duplicate| {
63706487
let chan_to_release =
63716488
if let Some(node_id) = next_channel_counterparty_node_id {
6372-
Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
6489+
Some(EventUnblockedChannel {
6490+
counterparty_node_id: node_id,
6491+
funding_txo: next_channel_outpoint,
6492+
channel_id: next_channel_id,
6493+
blocking_action: completed_blocker
6494+
})
63736495
} else {
63746496
// We can only get `None` here if we are processing a
63756497
// `ChannelMonitor`-originated event, in which case we
@@ -6430,10 +6552,10 @@ where
64306552
} else if definitely_duplicate {
64316553
if let Some(other_chan) = chan_to_release {
64326554
Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6433-
downstream_counterparty_node_id: other_chan.0,
6434-
downstream_funding_outpoint: other_chan.1,
6435-
downstream_channel_id: other_chan.2,
6436-
blocking_action: other_chan.3,
6555+
downstream_counterparty_node_id: other_chan.counterparty_node_id,
6556+
downstream_funding_outpoint: other_chan.funding_txo,
6557+
downstream_channel_id: other_chan.channel_id,
6558+
blocking_action: other_chan.blocking_action,
64376559
})
64386560
} else { None }
64396561
} else {
@@ -6504,8 +6626,11 @@ where
65046626
event, downstream_counterparty_and_funding_outpoint
65056627
} => {
65066628
self.pending_events.lock().unwrap().push_back((event, None));
6507-
if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
6508-
self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6629+
if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
6630+
self.handle_monitor_update_release(
6631+
unblocked.counterparty_node_id, unblocked.funding_txo,
6632+
unblocked.channel_id, Some(unblocked.blocking_action),
6633+
);
65096634
}
65106635
},
65116636
MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -11992,7 +12117,12 @@ where
1199212117
for action in actions.iter() {
1199312118
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
1199412119
downstream_counterparty_and_funding_outpoint:
11995-
Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
12120+
Some(EventUnblockedChannel {
12121+
counterparty_node_id: blocked_node_id,
12122+
funding_txo: _,
12123+
channel_id: blocked_channel_id,
12124+
blocking_action,
12125+
}), ..
1199612126
} = action {
1199712127
if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
1199812128
log_trace!(logger,

0 commit comments

Comments
 (0)