Skip to content

Expose next_channel_id in PaymentForwarded event #1475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
self.chain_monitor.update_channel(funding_txo, update)
}

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
return self.chain_monitor.release_pending_monitor_events();
}
}
Expand Down
20 changes: 12 additions & 8 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<MonitorEvent>>,
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
Expand Down Expand Up @@ -299,7 +299,7 @@ where C::Target: chain::Filter,
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
Expand Down Expand Up @@ -455,10 +455,10 @@ where C::Target: chain::Filter,
// UpdateCompleted event.
return Ok(());
}
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
});
}]));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
Expand All @@ -476,10 +476,10 @@ where C::Target: chain::Filter,
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
});
}]));
}

#[cfg(any(test, fuzzing, feature = "_test_utils"))]
Expand Down Expand Up @@ -668,7 +668,7 @@ where C::Target: chain::Filter,
}
}

fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
Expand All @@ -694,7 +694,11 @@ where C::Target: chain::Filter,
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
}
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
pending_monitor_events.push((monitor_outpoint, monitor_events));
}
}
}
pending_monitor_events
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub trait Watch<ChannelSigner: Sign> {
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)>;
}

/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
Expand Down
6 changes: 3 additions & 3 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ fn test_monitor_update_fail_reestablish() {
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
Expand Down Expand Up @@ -2087,7 +2087,7 @@ fn test_fail_htlc_on_broadcast_after_claim() {
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
check_added_monitors!(nodes[1], 1);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);

mine_transaction(&nodes[1], &bs_txn[0]);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
Expand Down Expand Up @@ -2468,7 +2468,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
}
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);

let mut bs_updates = None;
Expand Down
92 changes: 48 additions & 44 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3952,7 +3952,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
}
}

fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
mem::drop(channel_state_lock);
Expand Down Expand Up @@ -4043,12 +4043,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
} else { None };

let mut pending_events = self.pending_events.lock().unwrap();
let prev_channel_id = Some(prev_outpoint.to_channel_id());
let next_channel_id = Some(next_channel_id);

let source_channel_id = Some(prev_outpoint.to_channel_id());
pending_events.push(events::Event::PaymentForwarded {
source_channel_id,
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
prev_channel_id,
next_channel_id,
});
}
}
Expand Down Expand Up @@ -4501,7 +4503,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
}
};
self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false);
self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
Ok(())
}

Expand Down Expand Up @@ -4821,48 +4823,50 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
let mut failed_channels = Vec::new();
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
let has_pending_monitor_events = !pending_monitor_events.is_empty();
for monitor_event in pending_monitor_events.drain(..) {
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true);
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let by_id = &mut channel_state.by_id;
let pending_msg_events = &mut channel_state.pending_msg_events;
if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
let mut chan = remove_channel!(self, channel_state, chan_entry);
failed_channels.push(chan.force_shutdown(false));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
for (funding_outpoint, mut monitor_events) in pending_monitor_events.drain(..) {
for monitor_event in monitor_events.drain(..) {
match monitor_event {
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
}
},
MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
MonitorEvent::UpdateFailed(funding_outpoint) => {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let by_id = &mut channel_state.by_id;
let pending_msg_events = &mut channel_state.pending_msg_events;
if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
let mut chan = remove_channel!(self, channel_state, chan_entry);
failed_channels.push(chan.force_shutdown(false));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
} else {
ClosureReason::CommitmentTxConfirmed
};
self.issue_channel_close_events(&chan, reason);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
},
});
}
let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
} else {
ClosureReason::CommitmentTxConfirmed
};
self.issue_channel_close_events(&chan, reason);
pending_msg_events.push(events::MessageSendEvent::HandleError {
node_id: chan.get_counterparty_node_id(),
action: msgs::ErrorAction::SendErrorMessage {
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
},
});
}
},
MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
self.channel_monitor_updated(&funding_txo, monitor_update_id);
},
},
MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
self.channel_monitor_updated(&funding_txo, monitor_update_id);
},
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,15 +1327,20 @@ macro_rules! expect_payment_path_successful {
}

macro_rules! expect_payment_forwarded {
($node: expr, $source_node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
let events = $node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, $expected_fee);
if fee_earned_msat.is_some() {
// Is the event channel_id in one of the channels between the two nodes?
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $source_node.node.get_our_node_id() && x.channel_id == source_channel_id.unwrap()));
// Is the event prev_channel_id in one of the channels between the two nodes?
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $prev_node.node.get_our_node_id() && x.channel_id == prev_channel_id.unwrap()));
}
// We check for force closures since a force closed channel is removed from the
// node's channel list
if !$downstream_force_closed {
assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $next_node.node.get_our_node_id() && x.channel_id == next_channel_id.unwrap()));
}
assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
},
Expand Down Expand Up @@ -1579,7 +1584,7 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>,
{
$node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat;
expect_payment_forwarded!($node, $next_node, Some(fee as u64), false);
expect_payment_forwarded!($node, $next_node, $prev_node, Some(fee as u64), false, false);
expected_total_fee_msat += fee as u64;
check_added_monitors!($node, 1);
let new_next_msgs = if $new_msgs {
Expand Down
19 changes: 11 additions & 8 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2686,18 +2686,20 @@ fn test_htlc_on_chain_success() {
}
let chan_id = Some(chan_1.2);
match forwarded_events[1] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, chan_id);
assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
match forwarded_events[2] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, chan_id);
assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
Expand Down Expand Up @@ -5117,10 +5119,11 @@ fn test_onchain_to_onchain_claim() {
_ => panic!("Unexpected event"),
}
match events[1] {
Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
assert_eq!(source_channel_id, Some(chan_1.2));
assert_eq!(prev_channel_id, Some(chan_1.2));
assert_eq!(claim_from_onchain_tx, true);
assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!("Unexpected event"),
}
Expand Down Expand Up @@ -5287,7 +5290,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
// Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee
// and nodes[2] fee) is rounded down and then claimed in full.
mine_transaction(&nodes[1], &htlc_success_txn[0]);
expect_payment_forwarded!(nodes[1], nodes[0], Some(196*2), true);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(196*2), true, true);
let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
assert!(updates.update_add_htlcs.is_empty());
assert!(updates.update_fail_htlcs.is_empty());
Expand Down Expand Up @@ -8869,7 +8872,7 @@ fn do_test_onchain_htlc_settlement_after_close(broadcast_alice: bool, go_onchain
assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1);

nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[1], nodes[0], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false, false);
// If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage.
if !go_onchain_before_fulfill && broadcast_alice {
let events = nodes[1].node.get_and_clear_pending_msg_events();
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
let bs_htlc_claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(bs_htlc_claim_txn.len(), 1);
check_spends!(bs_htlc_claim_txn[0], as_commitment_tx);
expect_payment_forwarded!(nodes[1], nodes[0], None, false);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, false, false);

if !confirm_before_reload {
mine_transaction(&nodes[0], &as_commitment_tx);
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/reorg_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fn do_test_onchain_htlc_reorg(local_commitment: bool, claim: bool) {
// ChannelManager only polls chain::Watch::release_pending_monitor_events when we
// probe it for events, so we probe non-message events here (which should just be the
// PaymentForwarded event).
expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), true);
expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), true, true);
} else {
// Confirm the timeout tx and check that we fail the HTLC backwards
let block = Block {
Expand Down
Loading