Skip to content

Commit 8933a71

Browse files
committed
Add a RAAMonitorUpdateBlockingAction::ClaimedMPPPayment
If we claim an MPP payment and only persist some of the `ChannelMonitorUpdate`s which include the preimage prior to shutting down, we may be in a state where some of our `ChannelMonitor`s have the preimage for a payment while others do not. This, it turns out, is actually mostly safe - on startup `ChanelManager` will detect there's a payment it has as unclaimed but there's a corresponding payment preimage in a `ChannelMonitor` and go claim the other MPP parts. This works so long as the `ChannelManager` has been persisted after the payment has been received but prior to the `PaymentClaimable` event being processed (and the claim itself occurring). This is not always true and certainly not required on our API, but our `lightning-background-processor` today does persist prior to event handling so is generally true subject to some race conditions. In order to address this race we need to use copy payment preimages across channels irrespective of the `ChannelManager`'s payment state, but this introduces another wrinkle - if one channel makes substantial progress while other channel(s) are still waiting to get the payment preimage in `ChannelMonitor`(s) while the `ChannelManager` hasn't been persisted after the payment was received, we may end up without the preimage on disk. Here, we address this issue with a new `RAAMonitorUpdateBlockingAction` variant for this specific case. We block persistence of an RAA `ChannelMonitorUpdate` which may remove the preimage from disk until all channels have had the preimage added to their `ChannelMonitor`. We do this only in-memory (and not on disk) as we can recreate this blocker during the startup re-claim logic. This will enable us to claim MPP parts without using the `ChannelManager`'s payment state in later work.
1 parent 453ed11 commit 8933a71

File tree

1 file changed

+99
-14
lines changed

1 file changed

+99
-14
lines changed

lightning/src/ln/channelmanager.rs

+99-14
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,42 @@ enum BackgroundEvent {
757757
},
758758
}
759759

760+
/// A pointer to a channel that is unblocked when an event is surfaced
761+
#[derive(Debug)]
762+
pub(crate) struct EventUnblockedChannel {
763+
counterparty_node_id: PublicKey,
764+
funding_txo: OutPoint,
765+
channel_id: ChannelId,
766+
blocking_action: RAAMonitorUpdateBlockingAction,
767+
}
768+
769+
impl Writeable for EventUnblockedChannel {
770+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
771+
self.counterparty_node_id.write(writer)?;
772+
self.funding_txo.write(writer)?;
773+
self.channel_id.write(writer)?;
774+
self.blocking_action.write(writer)
775+
}
776+
}
777+
778+
impl MaybeReadable for EventUnblockedChannel {
779+
fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
780+
let counterparty_node_id = Readable::read(reader)?;
781+
let funding_txo = Readable::read(reader)?;
782+
let channel_id = Readable::read(reader)?;
783+
let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
784+
Some(blocking_action) => blocking_action,
785+
None => return Ok(None),
786+
};
787+
Ok(Some(EventUnblockedChannel {
788+
counterparty_node_id,
789+
funding_txo,
790+
channel_id,
791+
blocking_action,
792+
}))
793+
}
794+
}
795+
760796
#[derive(Debug)]
761797
pub(crate) enum MonitorUpdateCompletionAction {
762798
/// Indicates that a payment ultimately destined for us was claimed and we should emit an
@@ -774,7 +810,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
774810
/// outbound edge.
775811
EmitEventAndFreeOtherChannel {
776812
event: events::Event,
777-
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
813+
downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
778814
},
779815
/// Indicates we should immediately resume the operation of another channel, unless there is
780816
/// some other reason why the channel is blocked. In practice this simply means immediately
@@ -803,7 +839,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
803839
(1, FreeOtherChannelImmediately) => {
804840
(0, downstream_counterparty_node_id, required),
805841
(2, downstream_funding_outpoint, required),
806-
(4, blocking_action, required),
842+
(4, blocking_action, upgradable_required),
807843
// Note that by the time we get past the required read above, downstream_funding_outpoint will be
808844
// filled in, so we can safely unwrap it here.
809845
(5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -815,7 +851,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
815851
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
816852
// support async monitor updates even in LDK 0.0.116 and once we do we'll require no
817853
// downgrades to prior versions.
818-
(1, downstream_counterparty_and_funding_outpoint, option),
854+
(1, downstream_counterparty_and_funding_outpoint, upgradable_option),
819855
},
820856
);
821857

@@ -837,6 +873,26 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
837873
};
838874
);
839875

876+
#[derive(Debug)]
877+
pub(crate) struct PendingMPPClaim {
878+
channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
879+
channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
880+
}
881+
882+
#[derive(Clone)]
883+
pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
884+
885+
impl PartialEq for PendingMPPClaimPointer {
886+
fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
887+
}
888+
impl Eq for PendingMPPClaimPointer {}
889+
890+
impl core::fmt::Debug for PendingMPPClaimPointer {
891+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
892+
self.0.lock().unwrap().fmt(f)
893+
}
894+
}
895+
840896
#[derive(Clone, PartialEq, Eq, Debug)]
841897
/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
842898
/// the blocked action here. See enum variants for more info.
@@ -850,6 +906,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
850906
/// The HTLC ID on the inbound edge.
851907
htlc_id: u64,
852908
},
909+
/// We claimed an MPP payment across multiple channels. We have to block removing the payment
910+
/// preimage from any monitor until the last monitor is updated to contain the payment
911+
/// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that
912+
/// weren't updated on startup.
913+
///
914+
/// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
915+
/// state.
916+
ClaimedMPPPayment {
917+
pending_claim: PendingMPPClaimPointer,
918+
}
853919
}
854920

855921
impl RAAMonitorUpdateBlockingAction {
@@ -861,10 +927,16 @@ impl RAAMonitorUpdateBlockingAction {
861927
}
862928
}
863929

864-
impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
865-
(0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
866-
;);
930+
impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction,
931+
(0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) },
932+
unread_variants: ClaimedMPPPayment
933+
);
867934

935+
impl Readable for Option<RAAMonitorUpdateBlockingAction> {
936+
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
937+
Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
938+
}
939+
}
868940

869941
/// State we hold per-peer.
870942
pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -6442,7 +6514,12 @@ where
64426514
|htlc_claim_value_msat, definitely_duplicate| {
64436515
let chan_to_release =
64446516
if let Some(node_id) = next_channel_counterparty_node_id {
6445-
Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
6517+
Some(EventUnblockedChannel {
6518+
counterparty_node_id: node_id,
6519+
funding_txo: next_channel_outpoint,
6520+
channel_id: next_channel_id,
6521+
blocking_action: completed_blocker
6522+
})
64466523
} else {
64476524
// We can only get `None` here if we are processing a
64486525
// `ChannelMonitor`-originated event, in which case we
@@ -6503,10 +6580,10 @@ where
65036580
} else if definitely_duplicate {
65046581
if let Some(other_chan) = chan_to_release {
65056582
Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6506-
downstream_counterparty_node_id: other_chan.0,
6507-
downstream_funding_outpoint: other_chan.1,
6508-
downstream_channel_id: other_chan.2,
6509-
blocking_action: other_chan.3,
6583+
downstream_counterparty_node_id: other_chan.counterparty_node_id,
6584+
downstream_funding_outpoint: other_chan.funding_txo,
6585+
downstream_channel_id: other_chan.channel_id,
6586+
blocking_action: other_chan.blocking_action,
65106587
})
65116588
} else { None }
65126589
} else {
@@ -6573,8 +6650,11 @@ where
65736650
event, downstream_counterparty_and_funding_outpoint
65746651
} => {
65756652
self.pending_events.lock().unwrap().push_back((event, None));
6576-
if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
6577-
self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6653+
if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
6654+
self.handle_monitor_update_release(
6655+
unblocked.counterparty_node_id, unblocked.funding_txo,
6656+
unblocked.channel_id, Some(unblocked.blocking_action),
6657+
);
65786658
}
65796659
},
65806660
MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -12075,7 +12155,12 @@ where
1207512155
for action in actions.iter() {
1207612156
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
1207712157
downstream_counterparty_and_funding_outpoint:
12078-
Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
12158+
Some(EventUnblockedChannel {
12159+
counterparty_node_id: blocked_node_id,
12160+
funding_txo: _,
12161+
channel_id: blocked_channel_id,
12162+
blocking_action,
12163+
}), ..
1207912164
} = action {
1208012165
if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
1208112166
log_trace!(logger,

0 commit comments

Comments
 (0)