Skip to content

Make payments not duplicatively fail/succeed on reload/reconnect #918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {

struct KeyProvider {
node_id: u8,
rand_bytes_id: atomic::AtomicU8,
rand_bytes_id: atomic::AtomicU32,
revoked_commitments: Mutex<HashMap<[u8;32], Arc<Mutex<u64>>>>,
}
impl KeysInterface for KeyProvider {
Expand Down Expand Up @@ -179,7 +179,7 @@ impl KeysInterface for KeyProvider {
SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, self.node_id]).unwrap(),
SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, self.node_id]).unwrap(),
SecretKey::from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, self.node_id]).unwrap(),
[id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, self.node_id],
[id as u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, self.node_id],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it we don't need to use four bytes here because we won't exhaust the one-byte space?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we only ever create up to two of them per peer.

channel_value_satoshis,
[0; 32],
);
Expand All @@ -189,7 +189,9 @@ impl KeysInterface for KeyProvider {

fn get_secure_random_bytes(&self) -> [u8; 32] {
let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed);
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, id, 11, self.node_id]
let mut res = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, self.node_id];
res[30-4..30].copy_from_slice(&id.to_le_bytes());
res
}

fn read_chan_signer(&self, buffer: &[u8]) -> Result<Self::Signer, DecodeError> {
Expand Down Expand Up @@ -334,7 +336,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone(), Arc::new(TestPersister{})));

let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0), revoked_commitments: Mutex::new(HashMap::new()) });
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU32::new(0), revoked_commitments: Mutex::new(HashMap::new()) });
let mut config = UserConfig::default();
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
Expand Down
86 changes: 70 additions & 16 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,18 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
/// Locked *after* channel_state.
pending_inbound_payments: Mutex<HashMap<PaymentHash, PendingInboundPayment>>,

/// The session_priv bytes of outbound payments which are pending resolution.
/// The authoritative state of these HTLCs resides either within Channels or ChannelMonitors
/// (if the channel has been force-closed), however we track them here to prevent duplicative
/// PaymentSent/PaymentFailed events. Specifically, in the case of a duplicative
/// update_fulfill_htlc message after a reconnect, we may "claim" a payment twice.
/// Additionally, because ChannelMonitors are often not re-serialized after connecting block(s)
/// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
/// after reloading from disk while replaying blocks against ChannelMonitors.
///
/// Locked *after* channel_state.
pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,

our_network_key: SecretKey,
our_network_pubkey: PublicKey,

Expand Down Expand Up @@ -913,6 +925,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
pending_msg_events: Vec::new(),
}),
pending_inbound_payments: Mutex::new(HashMap::new()),
pending_outbound_payments: Mutex::new(HashSet::new()),

our_network_key: keys_manager.get_node_secret(),
our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
Expand Down Expand Up @@ -1467,7 +1480,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32) -> Result<(), APIError> {
log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
let prng_seed = self.keys_manager.get_secure_random_bytes();
let session_priv = SecretKey::from_slice(&self.keys_manager.get_secure_random_bytes()[..]).expect("RNG is busted");
let session_priv_bytes = self.keys_manager.get_secure_random_bytes();
let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted");

let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv)
.map_err(|_| APIError::RouteError{err: "Pubkey along hop was maliciously selected"})?;
Expand All @@ -1478,6 +1492,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);

let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));

let err: Result<(), _> = loop {
let mut channel_lock = self.channel_state.lock().unwrap();
Expand Down Expand Up @@ -2228,17 +2243,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
self.fail_htlc_backwards_internal(channel_state,
htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
},
HTLCSource::OutboundRoute { .. } => {
self.pending_events.lock().unwrap().push(
events::Event::PaymentFailed {
payment_hash,
rejected_by_dest: false,
HTLCSource::OutboundRoute { session_priv, .. } => {
if {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
} {
self.pending_events.lock().unwrap().push(
events::Event::PaymentFailed {
payment_hash,
rejected_by_dest: false,
#[cfg(test)]
error_code: None,
error_code: None,
#[cfg(test)]
error_data: None,
}
)
error_data: None,
}
)
} else {
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
}
},
};
}
Expand All @@ -2260,7 +2283,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
// from block_connected which may run during initialization prior to the chain_monitor
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
match source {
HTLCSource::OutboundRoute { ref path, .. } => {
HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
if {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
!self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
} {
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
return;
}
log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
mem::drop(channel_state_lock);
match &onion_error {
Expand Down Expand Up @@ -2489,12 +2520,20 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana

fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage) {
match source {
HTLCSource::OutboundRoute { .. } => {
HTLCSource::OutboundRoute { session_priv, .. } => {
mem::drop(channel_state_lock);
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::PaymentSent {
payment_preimage
});
if {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
} {
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::PaymentSent {
payment_preimage
});
} else {
log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
}
},
HTLCSource::PreviousHopData(hop_data) => {
let prev_outpoint = hop_data.outpoint;
Expand Down Expand Up @@ -4470,6 +4509,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
pending_payment.write(writer)?;
}

let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
(pending_outbound_payments.len() as u64).write(writer)?;
for session_priv in pending_outbound_payments.iter() {
session_priv.write(writer)?;
}

Ok(())
}
}
Expand Down Expand Up @@ -4709,6 +4754,14 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
}
}

let pending_outbound_payments_count: u64 = Readable::read(reader)?;
let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
for _ in 0..pending_outbound_payments_count {
if !pending_outbound_payments.insert(Readable::read(reader)?) {
return Err(DecodeError::InvalidValue);
}
}

let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());

Expand All @@ -4728,6 +4781,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
pending_msg_events: Vec::new(),
}),
pending_inbound_payments: Mutex::new(pending_inbound_payments),
pending_outbound_payments: Mutex::new(pending_outbound_payments),

our_network_key: args.keys_manager.get_node_secret(),
our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
Expand Down
137 changes: 134 additions & 3 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3496,6 +3496,34 @@ fn test_force_close_fail_back() {
check_spends!(node_txn[0], tx);
}

#[test]
fn test_dup_events_on_peer_disconnect() {
// Test that if we receive a duplicative update_fulfill_htlc message after a reconnect we do
// not generate a corresponding duplicative PaymentSent event. This did not use to be the case
// as we used to generate the event immediately upon receipt of the payment preimage in the
// update_fulfill_htlc message.

let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());

let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1000000).0;

assert!(nodes[1].node.claim_funds(payment_preimage));
check_added_monitors!(nodes[1], 1);
let claim_msgs = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &claim_msgs.update_fulfill_htlcs[0]);
expect_payment_sent!(nodes[0], payment_preimage);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);

reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (1, 0), (0, 0), (0, 0), (false, false));
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
}

#[test]
fn test_simple_peer_disconnect() {
// Test that we can reconnect when there are no lost messages
Expand Down Expand Up @@ -3718,8 +3746,7 @@ fn do_test_drop_messages_peer_disconnect(messages_delivered: u8) {
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
if messages_delivered < 2 {
reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (1, 0), (0, 0), (0, 0), (false, false));
//TODO: Deduplicate PaymentSent events, then enable this if:
//if messages_delivered < 1 {
if messages_delivered < 1 {
let events_4 = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events_4.len(), 1);
match events_4[0] {
Expand All @@ -3728,7 +3755,9 @@ fn do_test_drop_messages_peer_disconnect(messages_delivered: u8) {
},
_ => panic!("Unexpected event"),
}
//}
} else {
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
}
} else if messages_delivered == 2 {
// nodes[0] still wants its RAA + commitment_signed
reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, -1), (0, 0), (0, 0), (0, 0), (false, true));
Expand Down Expand Up @@ -4302,6 +4331,108 @@ fn test_no_txn_manager_serialize_deserialize() {
send_payment(&nodes[0], &[&nodes[1]], 1000000);
}

#[test]
fn test_dup_htlc_onchain_fails_on_reload() {
// When a Channel is closed, any outbound HTLCs which were relayed through it are simply
// dropped when the Channel is. From there, the ChannelManager relies on the ChannelMonitor
// having a copy of the relevant fail-/claim-back data and processes the HTLC fail/claim when
// the ChannelMonitor tells it to.
//
// If, due to an on-chain event, an HTLC is failed/claimed, and then we serialize the
// ChannelManager, we generally expect there not to be a duplicate HTLC fail/claim (eg via a
// PaymentFailed event appearing). However, because we may not serialize the relevant
// ChannelMonitor at the same time, this isn't strictly guaranteed. In order to provide this
// consistency, the ChannelManager explicitly tracks pending-onchain-resolution outbound HTLCs
// and de-duplicates ChannelMonitor events.
//
// This tests that explicit tracking behavior.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let persister: test_utils::TestPersister;
let new_chain_monitor: test_utils::TestChainMonitor;
let nodes_0_deserialized: ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>;
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);

create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());

// Route a payment, but force-close the channel before the HTLC fulfill message arrives at
// nodes[0].
let (payment_preimage, _, _) = route_payment(&nodes[0], &[&nodes[1]], 10000000);
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);

let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(node_txn.len(), 2);

assert!(nodes[1].node.claim_funds(payment_preimage));
check_added_monitors!(nodes[1], 1);

let mut header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[1].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[1].clone()]});
check_closed_broadcast!(nodes[1], true);
check_added_monitors!(nodes[1], 1);
let claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);

connect_block(&nodes[0], &Block { header, txdata: node_txn});

// Serialize out the ChannelMonitor before connecting the on-chain claim transactions. This is
// fairly normal behavior as ChannelMonitor(s) are often not re-serialized when on-chain events
// happen, unlike ChannelManager which tends to be re-serialized after any relevant event(s).
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();

header.prev_blockhash = header.block_hash();
let claim_block = Block { header, txdata: claim_txn};
connect_block(&nodes[0], &claim_block);
expect_payment_sent!(nodes[0], payment_preimage);

// ChannelManagers generally get re-serialized after any relevant event(s). Since we just
// connected a highly-relevant block, it likely gets serialized out now.
let mut chan_manager_serialized = test_utils::TestVecWriter(Vec::new());
nodes[0].node.write(&mut chan_manager_serialized).unwrap();

// Now reload nodes[0]...
persister = test_utils::TestPersister::new();
let keys_manager = &chanmon_cfgs[0].keys_manager;
new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), nodes[0].logger, node_cfgs[0].fee_estimator, &persister, keys_manager);
nodes[0].chain_monitor = &new_chain_monitor;
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
&mut chan_0_monitor_read, keys_manager).unwrap();
assert!(chan_0_monitor_read.is_empty());

let (_, nodes_0_deserialized_tmp) = {
let mut channel_monitors = HashMap::new();
channel_monitors.insert(chan_0_monitor.get_funding_txo().0, &mut chan_0_monitor);
<(BlockHash, ChannelManager<EnforcingSigner, &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestLogger>)>
::read(&mut std::io::Cursor::new(&chan_manager_serialized.0[..]), ChannelManagerReadArgs {
default_config: Default::default(),
keys_manager,
fee_estimator: node_cfgs[0].fee_estimator,
chain_monitor: nodes[0].chain_monitor,
tx_broadcaster: nodes[0].tx_broadcaster.clone(),
logger: nodes[0].logger,
channel_monitors,
}).unwrap()
};
nodes_0_deserialized = nodes_0_deserialized_tmp;

assert!(nodes[0].chain_monitor.watch_channel(chan_0_monitor.get_funding_txo().0, chan_0_monitor).is_ok());
check_added_monitors!(nodes[0], 1);
nodes[0].node = &nodes_0_deserialized;

// Note that if we re-connect the block which exposed nodes[0] to the payment preimage (but
// which the current ChannelMonitor has not seen), the ChannelManager's de-duplication of
// payment events should kick in, leaving us with no pending events here.
nodes[0].chain_monitor.chain_monitor.block_connected(&claim_block, nodes[0].blocks.borrow().len() as u32 - 1);
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
}

#[test]
fn test_manager_serialize_deserialize_events() {
// This test makes sure the events field in ChannelManager survives de/serialization
Expand Down
4 changes: 0 additions & 4 deletions lightning/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ pub enum Event {
},
/// Indicates an outbound payment we made succeeded (ie it made it all the way to its target
/// and we got back the payment preimage for it).
/// Note that duplicative PaymentSent Events may be generated - it is your responsibility to
/// deduplicate them by payment_preimage (which MUST be unique)!
PaymentSent {
/// The preimage to the hash given to ChannelManager::send_payment.
/// Note that this serves as a payment receipt, if you wish to have such a thing, you must
Expand All @@ -105,8 +103,6 @@ pub enum Event {
},
/// Indicates an outbound payment we made failed. Probably some intermediary node dropped
/// something. You may wish to retry with a different route.
/// Note that duplicative PaymentFailed Events may be generated - it is your responsibility to
/// deduplicate them by payment_hash (which MUST be unique)!
PaymentFailed {
/// The hash which was given to ChannelManager::send_payment.
payment_hash: PaymentHash,
Expand Down