Skip to content

Commit f781569

Browse files
committed
refactor: Move all enqueue logging to Transport
1 parent 8904b23 commit f781569

File tree

2 files changed

+49
-51
lines changed

2 files changed

+49
-51
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub(super) trait ITransport {
5656

5757
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
5858
/// not established yet.
59-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer>(&mut self, message: &M, output_buffer: &mut Q);
59+
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger;
6060
}
6161

6262

@@ -406,15 +406,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
406406
}
407407

408408
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
409-
macro_rules! enqueue_msg {
410-
($msg: expr) => {
411-
{
412-
log_trace!(self.logger, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
413-
peer.transport.enqueue_message($msg, &mut peer.pending_outbound_buffer)
414-
}
415-
}
416-
}
417-
418409
while !peer.pending_outbound_buffer.is_blocked() {
419410
let queue_space = peer.pending_outbound_buffer.queue_space();
420411
if queue_space > 0 {
@@ -424,12 +415,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
424415
let steps = ((queue_space + 2) / 3) as u8;
425416
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
426417
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
427-
enqueue_msg!(announce);
418+
peer.transport.enqueue_message(announce, &mut peer.pending_outbound_buffer, &*self.logger);
428419
if let &Some(ref update_a) = update_a_option {
429-
enqueue_msg!(update_a);
420+
peer.transport.enqueue_message(update_a, &mut peer.pending_outbound_buffer, &*self.logger);
430421
}
431422
if let &Some(ref update_b) = update_b_option {
432-
enqueue_msg!(update_b);
423+
peer.transport.enqueue_message(update_b, &mut peer.pending_outbound_buffer, &*self.logger);
433424
}
434425
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
435426
}
@@ -441,7 +432,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
441432
let steps = queue_space as u8;
442433
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
443434
for msg in all_messages.iter() {
444-
enqueue_msg!(msg);
435+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
445436
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
446437
}
447438
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -453,7 +444,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
453444
let steps = queue_space as u8;
454445
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
455446
for msg in all_messages.iter() {
456-
enqueue_msg!(msg);
447+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
457448
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
458449
}
459450
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -517,10 +508,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
517508
}
518509

519510
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
520-
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, descriptor: Descriptor, message: &M) {
521-
log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
522-
peer.transport.enqueue_message(message, &mut peer.pending_outbound_buffer);
523-
peers_needing_send.insert(descriptor);
511+
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, transport: &mut impl ITransport, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
512+
transport.enqueue_message(message, output_buffer, &*self.logger);
513+
peers_needing_send.insert(descriptor.clone());
524514
}
525515

526516
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -564,7 +554,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
564554
}
565555

566556
let resp = msgs::Init { features };
567-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
557+
self.enqueue_message(&mut peers.peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, peer_descriptor, &resp);
568558
}
569559
entry.insert(peer_descriptor.clone());
570560
}
@@ -593,7 +583,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
593583
},
594584
msgs::ErrorAction::SendErrorMessage { msg } => {
595585
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
596-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg);
586+
self.enqueue_message(&mut peers.peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, peer_descriptor, &msg);
597587
continue;
598588
},
599589
}
@@ -675,7 +665,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
675665
}
676666

677667
let resp = msgs::Init { features };
678-
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
668+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, &peer_descriptor, &resp);
679669
}
680670

681671
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
@@ -704,7 +694,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
704694
wire::Message::Ping(msg) => {
705695
if msg.ponglen < 65532 {
706696
let resp = msgs::Pong { byteslen: msg.ponglen };
707-
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
697+
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.pending_outbound_buffer, &peer_descriptor, &resp);
708698
}
709699
},
710700
wire::Message::Pong(_msg) => {
@@ -856,7 +846,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
856846
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
857847
});
858848
if peer.transport.is_connected() {
859-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
849+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
860850
}
861851
self.do_attempt_write_data(&mut descriptor, peer);
862852
},
@@ -868,7 +858,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
868858
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
869859
});
870860
if peer.transport.is_connected() {
871-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
861+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
872862
}
873863
self.do_attempt_write_data(&mut descriptor, peer);
874864
},
@@ -882,7 +872,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
882872
//they should just throw away this funding transaction
883873
});
884874
if peer.transport.is_connected() {
885-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
875+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
886876
}
887877
self.do_attempt_write_data(&mut descriptor, peer);
888878
},
@@ -895,7 +885,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
895885
//they should just throw away this funding transaction
896886
});
897887
if peer.transport.is_connected() {
898-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
888+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
899889
}
900890
self.do_attempt_write_data(&mut descriptor, peer);
901891
},
@@ -907,7 +897,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
907897
//TODO: Do whatever we're gonna do for handling dropped messages
908898
});
909899
if peer.transport.is_connected() {
910-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
900+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
911901
}
912902
self.do_attempt_write_data(&mut descriptor, peer);
913903
},
@@ -920,7 +910,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
920910
//they should just throw away this funding transaction
921911
});
922912
if peer.transport.is_connected() {
923-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
913+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
924914
}
925915
self.do_attempt_write_data(&mut descriptor, peer);
926916
},
@@ -936,21 +926,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
936926
});
937927
if peer.transport.is_connected() {
938928
for msg in update_add_htlcs {
939-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
929+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
940930
}
941931
for msg in update_fulfill_htlcs {
942-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
932+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
943933
}
944934
for msg in update_fail_htlcs {
945-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
935+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
946936
}
947937
for msg in update_fail_malformed_htlcs {
948-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
938+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
949939
}
950940
if let &Some(ref msg) = update_fee {
951-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
941+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
952942
}
953-
peer.transport.enqueue_message(commitment_signed, &mut peer.pending_outbound_buffer);
943+
peer.transport.enqueue_message(commitment_signed, &mut peer.pending_outbound_buffer, &*self.logger);
954944
}
955945
self.do_attempt_write_data(&mut descriptor, peer);
956946
},
@@ -962,7 +952,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
962952
//TODO: Do whatever we're gonna do for handling dropped messages
963953
});
964954
if peer.transport.is_connected() {
965-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
955+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
966956
}
967957
self.do_attempt_write_data(&mut descriptor, peer);
968958
},
@@ -974,7 +964,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
974964
//TODO: Do whatever we're gonna do for handling dropped messages
975965
});
976966
if peer.transport.is_connected() {
977-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
967+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
978968
}
979969
self.do_attempt_write_data(&mut descriptor, peer);
980970
},
@@ -986,7 +976,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
986976
//TODO: Do whatever we're gonna do for handling dropped messages
987977
});
988978
if peer.transport.is_connected() {
989-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
979+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
990980
}
991981
self.do_attempt_write_data(&mut descriptor, peer);
992982
},
@@ -998,7 +988,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
998988
//TODO: Do whatever we're gonna do for handling dropped messages
999989
});
1000990
if peer.transport.is_connected() {
1001-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
991+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1002992
}
1003993
self.do_attempt_write_data(&mut descriptor, peer);
1004994
},
@@ -1019,8 +1009,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10191009
}
10201010
}
10211011
if peer.transport.is_connected() {
1022-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1023-
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer);
1012+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
1013+
peer.transport.enqueue_message(update_msg, &mut peer.pending_outbound_buffer, &*self.logger);
10241014
}
10251015
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10261016
}
@@ -1035,7 +1025,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10351025
continue
10361026
}
10371027
if peer.transport.is_connected() {
1038-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1028+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10391029
}
10401030
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10411031
}
@@ -1050,7 +1040,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10501040
continue
10511041
}
10521042
if peer.transport.is_connected() {
1053-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1043+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10541044
}
10551045
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10561046
}
@@ -1070,7 +1060,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10701060
log_pubkey!(node_id),
10711061
msg.data);
10721062
if peer.transport.is_connected() {
1073-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1063+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10741064
}
10751065
// This isn't guaranteed to work, but if there is enough free
10761066
// room in the send buffer, put the error message there...
@@ -1092,7 +1082,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10921082
//TODO: Do whatever we're gonna do for handling dropped messages
10931083
});
10941084
if peer.transport.is_connected() {
1095-
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer);
1085+
peer.transport.enqueue_message(msg, &mut peer.pending_outbound_buffer, &*self.logger);
10961086
}
10971087
self.do_attempt_write_data(&mut descriptor, peer);
10981088
},
@@ -1178,7 +1168,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11781168
ponglen: 0,
11791169
byteslen: 64,
11801170
};
1181-
peer.transport.enqueue_message(&ping, &mut peer.pending_outbound_buffer);
1171+
peer.transport.enqueue_message(&ping, &mut peer.pending_outbound_buffer, &*self.logger);
11821172
needs_to_write_data = true;
11831173
}
11841174

lightning/src/ln/peers/transport.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,14 @@ impl<PeerHandshakeImpl: IPeerHandshake> ITransport for Transport<PeerHandshakeIm
146146
self.conduit.is_some()
147147
}
148148

149-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer>(&mut self, message: &M, output_buffer: &mut Q) {
149+
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L)
150+
where L::Target: Logger {
151+
150152
match self.conduit {
151153
None => panic!("Enqueueing messages only supported after transport is connected"),
152154
Some(ref mut conduit) => {
155+
log_trace!(logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(self.their_node_id.unwrap()));
156+
153157
let mut buffer = VecWriter(Vec::new());
154158
wire::write(message, &mut buffer).unwrap();
155159
output_buffer.push_back(conduit.encrypt(&buffer.0));
@@ -263,33 +267,36 @@ mod tests {
263267
#[test]
264268
#[should_panic(expected = "Enqueueing messages only supported after transport is connected")]
265269
fn inbound_enqueue_message_panic() {
270+
let logger = TestLogger::new();
266271
let mut transport = create_inbound_for_test::<PeerHandshakeTestStubComplete>();
267272
let mut spy = Vec::new();
268273

269274
let ping = msgs::Ping {
270275
ponglen: 0,
271276
byteslen: 64,
272277
};
273-
transport.enqueue_message(&ping, &mut spy);
278+
transport.enqueue_message(&ping, &mut spy, &logger);
274279
}
275280

276281
// Test that enqueue_message() panics in the wrong state
277282
#[test]
278283
#[should_panic(expected = "Enqueueing messages only supported after transport is connected")]
279284
fn outbound_enqueue_message_panic() {
285+
let logger = TestLogger::new();
280286
let mut transport = create_outbound_for_test::<PeerHandshakeTestStubComplete>();
281287
let mut spy = Vec::new();
282288

283289
let ping = msgs::Ping {
284290
ponglen: 0,
285291
byteslen: 64,
286292
};
287-
transport.enqueue_message(&ping, &mut spy);
293+
transport.enqueue_message(&ping, &mut spy, &logger);
288294
}
289295

290296
// Test that enqueue_message() puts something into the outbound buffer
291297
#[test]
292298
fn inbound_enqueue_message_encrypts() {
299+
let logger = TestLogger::new();
293300
let mut transport = create_inbound_for_test::<PeerHandshakeTestStubComplete>();
294301
let mut spy = Vec::new();
295302

@@ -299,13 +306,14 @@ mod tests {
299306
ponglen: 0,
300307
byteslen: 64,
301308
};
302-
transport.enqueue_message(&ping, &mut spy);
309+
transport.enqueue_message(&ping, &mut spy, &logger);
303310

304311
assert_matches!(&spy[..], [_]);
305312
}
306313

307314
#[test]
308315
fn outbound_enqueue_message_encrypts() {
316+
let logger = TestLogger::new();
309317
let mut transport = create_outbound_for_test::<PeerHandshakeTestStubComplete>();
310318
let mut spy = Vec::new();
311319

@@ -315,7 +323,7 @@ mod tests {
315323
ponglen: 0,
316324
byteslen: 64,
317325
};
318-
transport.enqueue_message(&ping, &mut spy);
326+
transport.enqueue_message(&ping, &mut spy, &logger);
319327

320328
assert_matches!(&spy[..], [_]);
321329
}

0 commit comments

Comments
 (0)