Skip to content

Hold PeerState in an RwLock rather than a Mutex #2968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 156 additions & 156 deletions lightning/src/ln/channelmanager.rs

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);

let per_peer_state = self.node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(peer_id).unwrap().write().unwrap();

let mut channel_keys_id = None;
if let Some(chan) = chan_lock.channel_by_id.get(chan_id).map(|phase| phase.context()) {
Expand Down Expand Up @@ -930,7 +930,7 @@ macro_rules! get_channel_ref {
($node: expr, $counterparty_node: expr, $per_peer_state_lock: ident, $peer_state_lock: ident, $channel_id: expr) => {
{
$per_peer_state_lock = $node.node.per_peer_state.read().unwrap();
$peer_state_lock = $per_peer_state_lock.get(&$counterparty_node.node.get_our_node_id()).unwrap().lock().unwrap();
$peer_state_lock = $per_peer_state_lock.get(&$counterparty_node.node.get_our_node_id()).unwrap().write().unwrap();
$peer_state_lock.channel_by_id.get_mut(&$channel_id).unwrap()
}
}
Expand Down Expand Up @@ -1556,8 +1556,8 @@ macro_rules! check_warn_msg {
/// Checks if at least one peer is connected.
fn is_any_peer_connected(node: &Node) -> bool {
let peer_state = node.node.per_peer_state.read().unwrap();
for (_, peer_mutex) in peer_state.iter() {
let peer = peer_mutex.lock().unwrap();
for (_, peer_rwlock) in peer_state.iter() {
let peer = peer_rwlock.read().unwrap();
if peer.is_connected { return true; }
}
false
Expand Down Expand Up @@ -2018,8 +2018,8 @@ pub fn do_commitment_signed_dance(node_a: &Node<'_, '_, '_>, node_b: &Node<'_, '

let node_a_per_peer_state = node_a.node.per_peer_state.read().unwrap();
let mut number_of_msg_events = 0;
for (cp_id, peer_state_mutex) in node_a_per_peer_state.iter() {
let peer_state = peer_state_mutex.lock().unwrap();
for (cp_id, peer_state) in node_a_per_peer_state.iter() {
let peer_state = peer_state.write().unwrap();
let cp_pending_msg_events = &peer_state.pending_msg_events;
number_of_msg_events += cp_pending_msg_events.len();
if cp_pending_msg_events.len() == 1 {
Expand Down Expand Up @@ -2853,7 +2853,7 @@ pub fn pass_claimed_payment_along_route<'a, 'b, 'c, 'd>(args: ClaimAlongRouteArg
let (base_fee, prop_fee) = {
let per_peer_state = $node.node.per_peer_state.read().unwrap();
let peer_state = per_peer_state.get(&$prev_node.node.get_our_node_id())
.unwrap().lock().unwrap();
.unwrap().write().unwrap();
let channel = peer_state.channel_by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap();
if let Some(prev_config) = channel.context().prev_config() {
(prev_config.forwarding_fee_base_msat as u64,
Expand Down Expand Up @@ -3458,7 +3458,7 @@ pub fn get_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, 'b,
macro_rules! get_channel_value_stat {
($node: expr, $counterparty_node: expr, $channel_id: expr) => {{
let peer_state_lock = $node.node.per_peer_state.read().unwrap();
let chan_lock = peer_state_lock.get(&$counterparty_node.node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = peer_state_lock.get(&$counterparty_node.node.get_our_node_id()).unwrap().write().unwrap();
let chan = chan_lock.channel_by_id.get(&$channel_id).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand Down
34 changes: 17 additions & 17 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ fn test_update_fee_that_funder_cannot_afford() {
// needed to sign the new commitment tx and (2) sign the new commitment tx.
let (local_revocation_basepoint, local_htlc_basepoint, local_funding) = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let local_chan = chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand All @@ -710,7 +710,7 @@ fn test_update_fee_that_funder_cannot_afford() {
};
let (remote_delayed_payment_basepoint, remote_htlc_basepoint,remote_point, remote_funding) = {
let per_peer_state = nodes[1].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().write().unwrap();
let remote_chan = chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand All @@ -727,7 +727,7 @@ fn test_update_fee_that_funder_cannot_afford() {

let res = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let local_chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let local_chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let local_chan = local_chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand Down Expand Up @@ -1429,7 +1429,7 @@ fn test_fee_spike_violation_fails_htlc() {
// needed to sign the new commitment tx and (2) sign the new commitment tx.
let (local_revocation_basepoint, local_htlc_basepoint, local_secret, next_local_point, local_funding) = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let local_chan = chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand All @@ -1445,7 +1445,7 @@ fn test_fee_spike_violation_fails_htlc() {
};
let (remote_delayed_payment_basepoint, remote_htlc_basepoint, remote_point, remote_funding) = {
let per_peer_state = nodes[1].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().write().unwrap();
let remote_chan = chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand Down Expand Up @@ -1476,7 +1476,7 @@ fn test_fee_spike_violation_fails_htlc() {

let res = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let local_chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let local_chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let local_chan = local_chan_lock.channel_by_id.get(&chan.2).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap();
Expand Down Expand Up @@ -3236,7 +3236,7 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use
// The dust limit applied to HTLC outputs considers the fee of the HTLC transaction as
// well, so HTLCs at exactly the dust limit will not be included in commitment txn.
nodes[2].node.per_peer_state.read().unwrap().get(&nodes[1].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&chan_2.2).unwrap().context().holder_dust_limit_satoshis * 1000
.unwrap().write().unwrap().channel_by_id.get(&chan_2.2).unwrap().context().holder_dust_limit_satoshis * 1000
} else { 3000000 };

let (_, first_payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], value);
Expand Down Expand Up @@ -5209,7 +5209,7 @@ fn do_test_fail_backwards_unrevoked_remote_announce(deliver_last_raa: bool, anno
assert_eq!(get_local_commitment_txn!(nodes[3], chan_2_3.2)[0].output.len(), 2);

let ds_dust_limit = nodes[3].node.per_peer_state.read().unwrap().get(&nodes[2].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&chan_2_3.2).unwrap().context().holder_dust_limit_satoshis;
.unwrap().write().unwrap().channel_by_id.get(&chan_2_3.2).unwrap().context().holder_dust_limit_satoshis;
// 0th HTLC:
let (_, payment_hash_1, ..) = route_payment(&nodes[0], &[&nodes[2], &nodes[3], &nodes[4]], ds_dust_limit*1000); // not added < dust limit + HTLC tx fee
// 1st HTLC:
Expand Down Expand Up @@ -6344,7 +6344,7 @@ fn test_update_add_htlc_bolt2_sender_exceed_max_htlc_num_and_htlc_id_increment()
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1000000, 0);
let max_accepted_htlcs = nodes[1].node.per_peer_state.read().unwrap().get(&nodes[0].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&chan.2).unwrap().context().counterparty_max_accepted_htlcs as u64;
.unwrap().write().unwrap().channel_by_id.get(&chan.2).unwrap().context().counterparty_max_accepted_htlcs as u64;

// Fetch a route in advance as we will be unable to once we're unable to send.
let (route, our_payment_hash, _, our_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100000);
Expand Down Expand Up @@ -6415,7 +6415,7 @@ fn test_update_add_htlc_bolt2_receiver_check_amount_received_more_than_min() {
let htlc_minimum_msat: u64;
{
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let channel = chan_lock.channel_by_id.get(&chan.2).unwrap();
htlc_minimum_msat = channel.context().get_holder_htlc_minimum_msat();
}
Expand Down Expand Up @@ -7021,7 +7021,7 @@ fn do_test_failure_delay_dust_htlc_local_commitment(announce_latest: bool) {
let chan =create_announced_chan_between_nodes(&nodes, 0, 1);

let bs_dust_limit = nodes[1].node.per_peer_state.read().unwrap().get(&nodes[0].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&chan.2).unwrap().context().holder_dust_limit_satoshis;
.unwrap().write().unwrap().channel_by_id.get(&chan.2).unwrap().context().holder_dust_limit_satoshis;

// We route 2 dust-HTLCs between A and B
let (_, payment_hash_1, ..) = route_payment(&nodes[0], &[&nodes[1]], bs_dust_limit*1000);
Expand Down Expand Up @@ -7114,7 +7114,7 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);

let bs_dust_limit = nodes[1].node.per_peer_state.read().unwrap().get(&nodes[0].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&chan.2).unwrap().context().holder_dust_limit_satoshis;
.unwrap().write().unwrap().channel_by_id.get(&chan.2).unwrap().context().holder_dust_limit_satoshis;

let (_payment_preimage_1, dust_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], bs_dust_limit*1000);
let (_payment_preimage_2, non_dust_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
Expand Down Expand Up @@ -7796,7 +7796,7 @@ fn test_counterparty_raa_skip_no_crash() {
let next_per_commitment_point;
{
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let mut guard = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let mut guard = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let keys = guard.channel_by_id.get_mut(&channel_id).map(
|phase| if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).flatten().unwrap().get_signer();
Expand Down Expand Up @@ -9227,7 +9227,7 @@ fn test_duplicate_chan_id() {

let funding_created = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let mut a_peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let mut a_peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
// Once we call `get_funding_created` the channel has a duplicate channel_id as
// another channel in the ChannelManager - an invalid state. Thus, we'd panic later when we
// try to create another channel. Instead, we drop the channel entirely here (leaving the
Expand Down Expand Up @@ -9942,7 +9942,7 @@ fn do_test_max_dust_htlc_exposure(dust_outbound_balance: bool, exposure_breach_e

let (dust_buffer_feerate, max_dust_htlc_exposure_msat) = {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
let chan = chan_lock.channel_by_id.get(&channel_id).unwrap();
(chan.context().get_dust_buffer_feerate(None) as u64,
chan.context().get_max_dust_htlc_exposure_msat(&LowerBoundedFeeEstimator(nodes[0].fee_estimator)))
Expand Down Expand Up @@ -10440,7 +10440,7 @@ fn test_remove_expired_outbound_unfunded_channels() {
// Asserts the outbound channel has been removed from a nodes[0]'s peer state map.
let check_outbound_channel_existence = |should_exist: bool| {
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
assert_eq!(chan_lock.channel_by_id.contains_key(&temp_channel_id), should_exist);
};

Expand Down Expand Up @@ -10491,7 +10491,7 @@ fn test_remove_expired_inbound_unfunded_channels() {
// Asserts the inbound channel has been removed from a nodes[1]'s peer state map.
let check_inbound_channel_existence = |should_exist: bool| {
let per_peer_state = nodes[1].node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().lock().unwrap();
let chan_lock = per_peer_state.get(&nodes[0].node.get_our_node_id()).unwrap().write().unwrap();
assert_eq!(chan_lock.channel_by_id.contains_key(&temp_channel_id), should_exist);
};

Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/onion_route_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ fn test_onion_failure() {

let short_channel_id = channels[1].0.contents.short_channel_id;
let amt_to_forward = nodes[1].node.per_peer_state.read().unwrap().get(&nodes[2].node.get_our_node_id())
.unwrap().lock().unwrap().channel_by_id.get(&channels[1].2).unwrap()
.unwrap().write().unwrap().channel_by_id.get(&channels[1].2).unwrap()
.context().get_counterparty_htlc_minimum_msat() - 1;
let mut bogus_route = route.clone();
let route_len = bogus_route.paths[0].hops.len();
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
{
let per_peer_state = nodes[1].node.per_peer_state.read().unwrap();
let mut peer_state = per_peer_state.get(&nodes[2].node.get_our_node_id())
.unwrap().lock().unwrap();
.unwrap().write().unwrap();
let mut channel = peer_state.channel_by_id.get_mut(&chan_id_2).unwrap();
let mut new_config = channel.context().config();
new_config.forwarding_fee_base_msat += 100_000;
Expand Down
6 changes: 3 additions & 3 deletions lightning/src/ln/reorg_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_

{
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
assert_eq!(peer_state.channel_by_id.len(), 1);
assert_eq!(nodes[0].node.short_to_chan_info.read().unwrap().len(), 2);
}
Expand Down Expand Up @@ -294,7 +294,7 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_

{
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
assert_eq!(peer_state.channel_by_id.len(), 0);
assert_eq!(nodes[0].node.short_to_chan_info.read().unwrap().len(), 0);
}
Expand Down Expand Up @@ -340,7 +340,7 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_

{
let per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap();
let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().write().unwrap();
assert_eq!(peer_state.channel_by_id.len(), 0);
assert_eq!(nodes[0].node.short_to_chan_info.read().unwrap().len(), 0);
}
Expand Down
Loading