Skip to content

Commit 9223252

Browse files
committed
Send channel_reestablish out-of-band to ensure ordered deliver
1 parent b4ce6a2 commit 9223252

File tree

5 files changed

+63
-22
lines changed

5 files changed

+63
-22
lines changed

src/ln/channelmanager.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2501,9 +2501,10 @@ impl ChannelMessageHandler for ChannelManager {
25012501
}
25022502
}
25032503

2504-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
2505-
let mut res = Vec::new();
2506-
let mut channel_state = self.channel_state.lock().unwrap();
2504+
fn peer_connected(&self, their_node_id: &PublicKey) {
2505+
let mut channel_state_lock = self.channel_state.lock().unwrap();
2506+
let channel_state = channel_state_lock.borrow_parts();
2507+
let pending_msg_events = channel_state.pending_msg_events;
25072508
channel_state.by_id.retain(|_, chan| {
25082509
if chan.get_their_node_id() == *their_node_id {
25092510
if !chan.have_received_message() {
@@ -2513,13 +2514,15 @@ impl ChannelMessageHandler for ChannelManager {
25132514
// drop it.
25142515
false
25152516
} else {
2516-
res.push(chan.get_channel_reestablish());
2517+
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
2518+
node_id: chan.get_their_node_id(),
2519+
msg: chan.get_channel_reestablish(),
2520+
});
25172521
true
25182522
}
25192523
} else { true }
25202524
});
25212525
//TODO: Also re-broadcast announcement_signatures
2522-
res
25232526
}
25242527

25252528
fn handle_error(&self, their_node_id: &PublicKey, msg: &msgs::ErrorMessage) {
@@ -5011,6 +5014,23 @@ mod tests {
50115014
assert_eq!(channel_state.short_to_id.len(), 0);
50125015
}
50135016

5017+
macro_rules! get_chan_reestablish_msgs {
5018+
($src_node: expr, $dst_node: expr) => {
5019+
{
5020+
let mut res = Vec::with_capacity(1);
5021+
for msg in $src_node.node.get_and_clear_pending_msg_events() {
5022+
if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
5023+
assert_eq!(*node_id, $dst_node.node.get_our_node_id());
5024+
res.push(msg.clone());
5025+
} else {
5026+
panic!("Unexpected event")
5027+
}
5028+
}
5029+
res
5030+
}
5031+
}
5032+
}
5033+
50145034
macro_rules! handle_chan_reestablish_msgs {
50155035
($src_node: expr, $dst_node: expr) => {
50165036
{
@@ -5069,8 +5089,10 @@ mod tests {
50695089
/// pending_htlc_adds includes both the holding cell and in-flight update_add_htlcs, whereas
50705090
/// for claims/fails they are separated out.
50715091
fn reconnect_nodes(node_a: &Node, node_b: &Node, pre_all_htlcs: bool, pending_htlc_adds: (i64, i64), pending_htlc_claims: (usize, usize), pending_cell_htlc_claims: (usize, usize), pending_cell_htlc_fails: (usize, usize), pending_raa: (bool, bool)) {
5072-
let reestablish_1 = node_a.node.peer_connected(&node_b.node.get_our_node_id());
5073-
let reestablish_2 = node_b.node.peer_connected(&node_a.node.get_our_node_id());
5092+
node_a.node.peer_connected(&node_b.node.get_our_node_id());
5093+
let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
5094+
node_b.node.peer_connected(&node_a.node.get_our_node_id());
5095+
let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
50745096

50755097
let mut resp_1 = Vec::new();
50765098
for msg in reestablish_1 {
@@ -5567,9 +5589,11 @@ mod tests {
55675589
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
55685590
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
55695591

5570-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5592+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5593+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
55715594
assert_eq!(reestablish_1.len(), 1);
5572-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5595+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5596+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
55735597
assert_eq!(reestablish_2.len(), 1);
55745598

55755599
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -5855,9 +5879,11 @@ mod tests {
58555879
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
58565880
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
58575881

5858-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5882+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5883+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
58595884
assert_eq!(reestablish_1.len(), 1);
5860-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5885+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5886+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
58615887
assert_eq!(reestablish_2.len(), 1);
58625888

58635889
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();
@@ -5875,9 +5901,11 @@ mod tests {
58755901
assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
58765902
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
58775903

5878-
let reestablish_1 = nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5904+
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
5905+
let reestablish_1 = get_chan_reestablish_msgs!(nodes[0], nodes[1]);
58795906
assert_eq!(reestablish_1.len(), 1);
5880-
let reestablish_2 = nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5907+
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id());
5908+
let reestablish_2 = get_chan_reestablish_msgs!(nodes[1], nodes[0]);
58815909
assert_eq!(reestablish_2.len(), 1);
58825910

58835911
nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &reestablish_2[0]).unwrap();

src/ln/msgs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,14 +308,14 @@ pub struct UpdateFee {
308308
pub(crate) feerate_per_kw: u32,
309309
}
310310

311-
#[derive(PartialEq)]
311+
#[derive(PartialEq, Clone)]
312312
pub(crate) struct DataLossProtect {
313313
pub(crate) your_last_per_commitment_secret: [u8; 32],
314314
pub(crate) my_current_per_commitment_point: PublicKey,
315315
}
316316

317317
/// A channel_reestablish message to be sent or received from a peer
318-
#[derive(PartialEq)]
318+
#[derive(PartialEq, Clone)]
319319
pub struct ChannelReestablish {
320320
pub(crate) channel_id: [u8; 32],
321321
pub(crate) next_local_commitment_number: u64,
@@ -551,7 +551,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
551551
fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
552552

553553
/// Handle a peer reconnecting, possibly generating channel_reestablish message(s).
554-
fn peer_connected(&self, their_node_id: &PublicKey) -> Vec<ChannelReestablish>;
554+
fn peer_connected(&self, their_node_id: &PublicKey);
555555
/// Handle an incoming channel_reestablish message from the given peer.
556556
fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &ChannelReestablish) -> Result<(), HandleError>;
557557

src/ln/peer_handler.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
520520
}, 16);
521521
}
522522

523-
for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
524-
encode_and_send_msg!(msg, 136);
525-
}
523+
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
526524
},
527525
17 => {
528526
let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -837,6 +835,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
837835
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
838836
Self::do_attempt_write_data(&mut descriptor, peer);
839837
},
838+
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
839+
log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
840+
log_pubkey!(node_id),
841+
log_bytes!(msg.channel_id));
842+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
843+
//TODO: Do whatever we're gonna do for handling dropped messages
844+
});
845+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
846+
Self::do_attempt_write_data(&mut descriptor, peer);
847+
},
840848
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
841849
log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
842850
if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {

src/util/events.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ pub enum MessageSendEvent {
160160
/// The message which should be sent.
161161
msg: msgs::Shutdown,
162162
},
163+
/// Used to indicate that a channel_reestablish message should be sent to the peer with the given node_id.
164+
SendChannelReestablish {
165+
/// The node_id of the node which should receive this message
166+
node_id: PublicKey,
167+
/// The message which should be sent.
168+
msg: msgs::ChannelReestablish,
169+
},
163170
/// Used to indicate that a channel_announcement and channel_update should be broadcast to all
164171
/// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).
165172
BroadcastChannelAnnouncement {

src/util/test_utils.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
135135
Err(HandleError { err: "", action: None })
136136
}
137137
fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
138-
fn peer_connected(&self, _their_node_id: &PublicKey) -> Vec<msgs::ChannelReestablish> {
139-
Vec::new()
140-
}
138+
fn peer_connected(&self, _their_node_id: &PublicKey) {}
141139
fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
142140
}
143141

0 commit comments

Comments
 (0)