Skip to content

Commit 59fe300

Browse files
authored
Merge pull request #617 from sourabhmarathe/serialize-channelmanager-events
Serialize ChannelManager events
2 parents b5723c7 + 9c587e5 commit 59fe300

File tree

2 files changed

+142
-5
lines changed

2 files changed

+142
-5
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError};
3838
use chain::keysinterface::{ChannelKeys, KeysInterface, KeysManager, InMemoryChannelKeys};
3939
use util::config::UserConfig;
4040
use util::{byte_utils, events};
41-
use util::ser::{Readable, ReadableArgs, Writeable, Writer};
41+
use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
4242
use util::chacha20::{ChaCha20, ChaChaReader};
4343
use util::logger::Logger;
4444
use util::errors::APIError;
@@ -3608,6 +3608,12 @@ impl<ChanSigner: ChannelKeys + Writeable, M: Deref, T: Deref, K: Deref, F: Deref
36083608
peer_state.latest_features.write(writer)?;
36093609
}
36103610

3611+
let events = self.pending_events.lock().unwrap();
3612+
(events.len() as u64).write(writer)?;
3613+
for event in events.iter() {
3614+
event.write(writer)?;
3615+
}
3616+
36113617
(self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?;
36123618

36133619
Ok(())
@@ -3754,12 +3760,13 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
37543760
}
37553761
}
37563762

3763+
const MAX_ALLOC_SIZE: usize = 1024 * 64;
37573764
let forward_htlcs_count: u64 = Readable::read(reader)?;
37583765
let mut forward_htlcs = HashMap::with_capacity(cmp::min(forward_htlcs_count as usize, 128));
37593766
for _ in 0..forward_htlcs_count {
37603767
let short_channel_id = Readable::read(reader)?;
37613768
let pending_forwards_count: u64 = Readable::read(reader)?;
3762-
let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, 128));
3769+
let mut pending_forwards = Vec::with_capacity(cmp::min(pending_forwards_count as usize, MAX_ALLOC_SIZE/mem::size_of::<HTLCForwardInfo>()));
37633770
for _ in 0..pending_forwards_count {
37643771
pending_forwards.push(Readable::read(reader)?);
37653772
}
@@ -3771,15 +3778,15 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
37713778
for _ in 0..claimable_htlcs_count {
37723779
let payment_hash = Readable::read(reader)?;
37733780
let previous_hops_len: u64 = Readable::read(reader)?;
3774-
let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, 2));
3781+
let mut previous_hops = Vec::with_capacity(cmp::min(previous_hops_len as usize, MAX_ALLOC_SIZE/mem::size_of::<ClaimableHTLC>()));
37753782
for _ in 0..previous_hops_len {
37763783
previous_hops.push(Readable::read(reader)?);
37773784
}
37783785
claimable_htlcs.insert(payment_hash, previous_hops);
37793786
}
37803787

37813788
let peer_count: u64 = Readable::read(reader)?;
3782-
let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, 128));
3789+
let mut per_peer_state = HashMap::with_capacity(cmp::min(peer_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState>)>()));
37833790
for _ in 0..peer_count {
37843791
let peer_pubkey = Readable::read(reader)?;
37853792
let peer_state = PeerState {
@@ -3788,6 +3795,15 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
37883795
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
37893796
}
37903797

3798+
let event_count: u64 = Readable::read(reader)?;
3799+
let mut pending_events_read: Vec<events::Event> = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<events::Event>()));
3800+
for _ in 0..event_count {
3801+
match MaybeReadable::read(reader)? {
3802+
Some(event) => pending_events_read.push(event),
3803+
None => continue,
3804+
}
3805+
}
3806+
37913807
let last_node_announcement_serial: u32 = Readable::read(reader)?;
37923808

37933809
let channel_manager = ChannelManager {
@@ -3813,7 +3829,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
38133829

38143830
per_peer_state: RwLock::new(per_peer_state),
38153831

3816-
pending_events: Mutex::new(Vec::new()),
3832+
pending_events: Mutex::new(pending_events_read),
38173833
total_consistency_lock: RwLock::new(()),
38183834
keys_manager: args.keys_manager,
38193835
logger: args.logger,

lightning/src/ln/functional_tests.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3954,6 +3954,127 @@ fn test_no_txn_manager_serialize_deserialize() {
39543954
send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
39553955
}
39563956

3957+
#[test]
3958+
fn test_manager_serialize_deserialize_events() {
3959+
// This test makes sure the events field in ChannelManager survives de/serialization
3960+
let chanmon_cfgs = create_chanmon_cfgs(2);
3961+
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
3962+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
3963+
let fee_estimator: test_utils::TestFeeEstimator;
3964+
let new_chan_monitor: test_utils::TestChannelMonitor;
3965+
let keys_manager: test_utils::TestKeysInterface;
3966+
let nodes_0_deserialized: ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>;
3967+
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
3968+
3969+
// Start creating a channel, but stop right before broadcasting the event message FundingBroadcastSafe
3970+
let channel_value = 100000;
3971+
let push_msat = 10001;
3972+
let a_flags = InitFeatures::known();
3973+
let b_flags = InitFeatures::known();
3974+
let node_a = nodes.pop().unwrap();
3975+
let node_b = nodes.pop().unwrap();
3976+
node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42, None).unwrap();
3977+
node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), a_flags, &get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id()));
3978+
node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), b_flags, &get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id()));
3979+
3980+
let (temporary_channel_id, tx, funding_output) = create_funding_transaction(&node_a, channel_value, 42);
3981+
3982+
node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output);
3983+
check_added_monitors!(node_a, 0);
3984+
3985+
node_b.node.handle_funding_created(&node_a.node.get_our_node_id(), &get_event_msg!(node_a, MessageSendEvent::SendFundingCreated, node_b.node.get_our_node_id()));
3986+
{
3987+
let mut added_monitors = node_b.chan_monitor.added_monitors.lock().unwrap();
3988+
assert_eq!(added_monitors.len(), 1);
3989+
assert_eq!(added_monitors[0].0, funding_output);
3990+
added_monitors.clear();
3991+
}
3992+
3993+
node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id()));
3994+
{
3995+
let mut added_monitors = node_a.chan_monitor.added_monitors.lock().unwrap();
3996+
assert_eq!(added_monitors.len(), 1);
3997+
assert_eq!(added_monitors[0].0, funding_output);
3998+
added_monitors.clear();
3999+
}
4000+
// Normally, this is where node_a would check for a FundingBroadcastSafe event, but the test de/serializes first instead
4001+
4002+
nodes.push(node_a);
4003+
nodes.push(node_b);
4004+
4005+
// Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized
4006+
let nodes_0_serialized = nodes[0].node.encode();
4007+
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
4008+
nodes[0].chan_monitor.simple_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write_for_disk(&mut chan_0_monitor_serialized).unwrap();
4009+
4010+
fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
4011+
new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), Arc::new(test_utils::TestLogger::new()), &fee_estimator);
4012+
nodes[0].chan_monitor = &new_chan_monitor;
4013+
let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
4014+
let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read, Arc::new(test_utils::TestLogger::new())).unwrap();
4015+
assert!(chan_0_monitor_read.is_empty());
4016+
4017+
let mut nodes_0_read = &nodes_0_serialized[..];
4018+
let config = UserConfig::default();
4019+
keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()));
4020+
let (_, nodes_0_deserialized_tmp) = {
4021+
let mut channel_monitors = HashMap::new();
4022+
channel_monitors.insert(chan_0_monitor.get_funding_txo(), &mut chan_0_monitor);
4023+
<(BlockHash, ChannelManager<EnforcingChannelKeys, &test_utils::TestChannelMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator>)>::read(&mut nodes_0_read, ChannelManagerReadArgs {
4024+
default_config: config,
4025+
keys_manager: &keys_manager,
4026+
fee_estimator: &fee_estimator,
4027+
monitor: nodes[0].chan_monitor,
4028+
tx_broadcaster: nodes[0].tx_broadcaster.clone(),
4029+
logger: Arc::new(test_utils::TestLogger::new()),
4030+
channel_monitors: &mut channel_monitors,
4031+
}).unwrap()
4032+
};
4033+
nodes_0_deserialized = nodes_0_deserialized_tmp;
4034+
assert!(nodes_0_read.is_empty());
4035+
4036+
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
4037+
4038+
assert!(nodes[0].chan_monitor.add_monitor(chan_0_monitor.get_funding_txo(), chan_0_monitor).is_ok());
4039+
nodes[0].node = &nodes_0_deserialized;
4040+
4041+
// After deserializing, make sure the FundingBroadcastSafe event is still held by the channel manager
4042+
let events_4 = nodes[0].node.get_and_clear_pending_events();
4043+
assert_eq!(events_4.len(), 1);
4044+
match events_4[0] {
4045+
Event::FundingBroadcastSafe { ref funding_txo, user_channel_id } => {
4046+
assert_eq!(user_channel_id, 42);
4047+
assert_eq!(*funding_txo, funding_output);
4048+
},
4049+
_ => panic!("Unexpected event"),
4050+
};
4051+
4052+
// Make sure the channel is functioning as though the de/serialization never happened
4053+
nodes[0].block_notifier.register_listener(nodes[0].node);
4054+
assert_eq!(nodes[0].node.list_channels().len(), 1);
4055+
check_added_monitors!(nodes[0], 1);
4056+
4057+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
4058+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
4059+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { features: InitFeatures::empty() });
4060+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
4061+
4062+
nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &reestablish_1[0]);
4063+
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
4064+
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]);
4065+
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
4066+
4067+
let (funding_locked, _) = create_chan_between_nodes_with_value_confirm(&nodes[0], &nodes[1], &tx);
4068+
let (announcement, as_update, bs_update) = create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked);
4069+
for node in nodes.iter() {
4070+
assert!(node.net_graph_msg_handler.handle_channel_announcement(&announcement).unwrap());
4071+
node.net_graph_msg_handler.handle_channel_update(&as_update).unwrap();
4072+
node.net_graph_msg_handler.handle_channel_update(&bs_update).unwrap();
4073+
}
4074+
4075+
send_payment(&nodes[0], &[&nodes[1]], 1000000, 1_000_000);
4076+
}
4077+
39574078
#[test]
39584079
fn test_simple_manager_serialize_deserialize() {
39594080
let chanmon_cfgs = create_chanmon_cfgs(2);

0 commit comments

Comments
 (0)