Skip to content

Commit bf76e7c

Browse files
committed
review: Collapse OutboundQueue/Transport traits
Reduce trait complexity in the OutboundQueue and Transport objects. Splitting the objects into separate traits allowed for cleaner separation of responsibilities. But in practice, this led to complexity in the PeerManager function signatures and confusion in why they needed to be separated. To move this module to a more maintainable state for the core development team, collapse PayloadQueuer/SocketDescriptorFlusher into a single IOutboundQueue trait. Also, remove MessageQueuer in favor of just passing in the entire Transport object. This makes the code look more similar to the rest of the codebase while still leveraging traits in the OutboundQueue and Transport layer that allow for test doubles and real unit tests.
1 parent e5268bb commit bf76e7c

File tree

4 files changed

+110
-102
lines changed

4 files changed

+110
-102
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ use bitcoin::hashes::sha256::Hash as Sha256;
3939
use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
4040
use bitcoin::hashes::{HashEngine, Hash};
4141
use ln::peers::outbound_queue::OutboundQueue;
42-
use ln::peers::transport::{PayloadQueuer, Transport};
42+
use ln::peers::transport::Transport;
4343
use std::collections::hash_map::IterMut;
4444
use std::iter::Filter;
4545

4646
// Number of items that can exist in the OutboundQueue before Sync message flow control is triggered
4747
const OUTBOUND_QUEUE_SIZE: usize = 10;
4848

4949
/// Interface PeerManager uses to interact with the Transport object
50-
pub(super) trait ITransport: MessageQueuer {
50+
pub(super) trait ITransport {
5151
/// Instantiate the new outbound Transport
5252
fn new_outbound(initiator_static_private_key: &SecretKey, responder_static_public_key: &PublicKey, initiator_ephemeral_private_key: &SecretKey) -> Self;
5353

@@ -59,29 +59,51 @@ pub(super) trait ITransport: MessageQueuer {
5959

6060
/// Process input data similar to reading it off a descriptor directly. Returns true on the first call
6161
/// that results in the transport being newly connected.
62-
fn process_input(&mut self, input: &[u8], output_buffer: &mut impl PayloadQueuer) -> Result<bool, String>;
62+
fn process_input(&mut self, input: &[u8], outbound_queue: &mut impl IOutboundQueue) -> Result<bool, String>;
6363

6464
/// Returns true if the connection is established and encrypted messages can be sent.
6565
fn is_connected(&self) -> bool;
6666

6767
/// Returns the node_id of the remote node. Panics if not connected.
6868
fn get_their_node_id(&self) -> PublicKey;
6969

70+
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
71+
/// not established yet.
72+
fn enqueue_message<M: Encode + Writeable, L: Deref>(&mut self, message: &M, outbound_queue: &mut impl IOutboundQueue, logger: L) where L::Target: Logger;
73+
7074
/// Returns all Messages that have been received and can be successfully parsed by the Transport
7175
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger;
7276
}
7377

74-
/// Interface PeerManager uses to queue message to send. Implemented by Transport to handle
75-
/// encryption/decryption post-NOISE.
76-
pub(super) trait MessageQueuer {
77-
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
78-
/// not established yet.
79-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger;
80-
}
78+
/// The OutboundQueue is a container for unencrypted payloads during the NOISE handshake and
79+
/// encrypted Messages post-NOISE. This trait abstracts the behavior to push items to a queue, flush
80+
/// them through a SocketDescriptor, and handle flow control. Each Peer owns a separate OutboundQueue.
81+
///
82+
/// A trait is used to enable tests to use test doubles that implement a subset of the api with
83+
/// cleaner test validation.
84+
pub(super) trait IOutboundQueue {
85+
86+
// ____ _ __ __ _ _ _
87+
// | _ \ _ _ ___| |__ | \/ | ___| |_| |__ ___ __| |___
88+
// | |_) | | | / __| '_ \ | |\/| |/ _ \ __| '_ \ / _ \ / _` / __|
89+
// | __/| |_| \__ \ | | | | | | | __/ |_| | | | (_) | (_| \__ \
90+
// |_| \__,_|___/_| |_| |_| |_|\___|\__|_| |_|\___/ \__,_|___/
91+
92+
/// Unconditionally queue item. May increase queue above soft limit.
93+
fn push_back(&mut self, item: Vec<u8>);
94+
95+
/// Returns true if the queue is empty
96+
fn is_empty(&self) -> bool;
97+
98+
/// Returns the amount of free space in the queue before the soft limit
99+
fn queue_space(&self) -> usize;
100+
101+
// _____ _ _ __ __ _ _ _
102+
// | ___| |_ _ ___| |__ | \/ | ___| |_| |__ ___ __| |___
103+
// | |_ | | | | / __| '_ \ | |\/| |/ _ \ __| '_ \ / _ \ / _` / __|
104+
// | _| | | |_| \__ \ | | | | | | | __/ |_| | | | (_) | (_| \__ \
105+
// |_| |_|\__,_|___/_| |_| |_| |_|\___|\__|_| |_|\___/ \__,_|___/
81106

82-
/// Trait representing a container that can try to flush data through a SocketDescriptor. Used by the
83-
/// PeerManager to handle flushing the outbound queue and flow control.
84-
pub(super) trait SocketDescriptorFlusher {
85107
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
86108
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
87109
/// has been set. Use unblock() when the SocketDescriptor may have more room.
@@ -583,11 +605,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
583605

584606
// Fill remaining slots in output queue with sync messages, updating the sync state when
585607
// appropriate
586-
fn fill_outbound_queue_with_sync<Q: PayloadQueuer + SocketDescriptorFlusher>(
608+
fn fill_outbound_queue_with_sync(
587609
&self,
588610
sync_status: &mut InitSyncTracker,
589-
message_queuer: &mut impl MessageQueuer,
590-
outbound_queue: &mut Q) {
611+
transport: &mut TransportImpl,
612+
outbound_queue: &mut OutboundQueue) {
591613

592614
let queue_space = outbound_queue.queue_space();
593615
if queue_space > 0 {
@@ -597,12 +619,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
597619
let steps = ((queue_space + 2) / 3) as u8;
598620
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
599621
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
600-
message_queuer.enqueue_message(announce, outbound_queue, &*self.logger);
622+
transport.enqueue_message(announce, outbound_queue, &*self.logger);
601623
if let &Some(ref update_a) = update_a_option {
602-
message_queuer.enqueue_message(update_a, outbound_queue, &*self.logger);
624+
transport.enqueue_message(update_a, outbound_queue, &*self.logger);
603625
}
604626
if let &Some(ref update_b) = update_b_option {
605-
message_queuer.enqueue_message(update_b, outbound_queue, &*self.logger);
627+
transport.enqueue_message(update_b, outbound_queue, &*self.logger);
606628
}
607629
*sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
608630
}
@@ -614,7 +636,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
614636
let steps = queue_space as u8;
615637
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
616638
for msg in all_messages.iter() {
617-
message_queuer.enqueue_message(msg, outbound_queue, &*self.logger);
639+
transport.enqueue_message(msg, outbound_queue, &*self.logger);
618640
*sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
619641
}
620642
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -626,7 +648,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
626648
let steps = queue_space as u8;
627649
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
628650
for msg in all_messages.iter() {
629-
message_queuer.enqueue_message(msg, outbound_queue, &*self.logger);
651+
transport.enqueue_message(msg, outbound_queue, &*self.logger);
630652
*sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
631653
}
632654
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -637,18 +659,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
637659
}
638660
}
639661

640-
fn do_attempt_write_data<Q: PayloadQueuer + SocketDescriptorFlusher>(
662+
fn do_attempt_write_data(
641663
&self,
642664
descriptor: &mut Descriptor,
643665
post_init_state: &mut Option<PostInitState>,
644-
message_queuer: &mut impl MessageQueuer,
645-
outbound_queue: &mut Q) {
666+
transport: &mut TransportImpl,
667+
outbound_queue: &mut OutboundQueue) {
646668

647669
while !outbound_queue.is_blocked() {
648670
// If connected, fill output queue with sync messages
649671
match post_init_state {
650672
None => {},
651-
&mut Some(ref mut state) => self.fill_outbound_queue_with_sync(&mut state.sync_status, message_queuer, outbound_queue)
673+
&mut Some(ref mut state) => self.fill_outbound_queue_with_sync(&mut state.sync_status, transport, outbound_queue)
652674
}
653675

654676
// No messages to send

lightning/src/ln/peers/outbound_queue.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
/// Abstracts the buffer used to write data through a SocketDescriptor handling partial writes and
1111
/// flow control.
1212
13-
use ln::peers::handler::{SocketDescriptor, SocketDescriptorFlusher};
14-
use ln::peers::transport::PayloadQueuer;
13+
use ln::peers::handler::{IOutboundQueue, SocketDescriptor};
1514
use std::collections::LinkedList;
1615
use std::cmp;
1716

@@ -22,24 +21,31 @@ pub(super) struct OutboundQueue {
2221
buffer_first_msg_offset: usize,
2322
}
2423

25-
impl PayloadQueuer for OutboundQueue {
26-
/// Unconditionally queue item. May increase queue above soft limit.
24+
impl OutboundQueue {
25+
pub(super) fn new(soft_limit: usize) -> Self {
26+
Self {
27+
blocked: false,
28+
soft_limit,
29+
buffer: LinkedList::new(),
30+
buffer_first_msg_offset: 0,
31+
}
32+
}
33+
}
34+
35+
impl IOutboundQueue for OutboundQueue {
36+
2737
fn push_back(&mut self, item: Vec<u8>) {
2838
self.buffer.push_back(item);
2939
}
3040

31-
/// Returns true if the queue is empty
3241
fn is_empty(&self) -> bool {
3342
self.buffer.is_empty()
3443
}
3544

36-
/// Returns the amount of free space in the queue before the soft limit
3745
fn queue_space(&self) -> usize {
3846
self.soft_limit - cmp::min(self.soft_limit, self.buffer.len())
3947
}
40-
}
4148

42-
impl SocketDescriptorFlusher for OutboundQueue {
4349
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
4450
// Exit early if a previous full write failed and haven't heard that there may be more
4551
// room available
@@ -77,20 +83,6 @@ impl SocketDescriptorFlusher for OutboundQueue {
7783
}
7884
}
7985

80-
impl OutboundQueue {
81-
82-
/// Create a new writer with a soft limit that is used to notify the SocketDescriptor when
83-
/// it is OK to resume reading if it was paused
84-
pub(super) fn new(soft_limit: usize) -> Self {
85-
Self {
86-
blocked: false,
87-
soft_limit,
88-
buffer: LinkedList::new(),
89-
buffer_first_msg_offset: 0,
90-
}
91-
}
92-
}
93-
9486
#[cfg(test)]
9587
mod tests {
9688
use super::*;

lightning/src/ln/peers/test_util.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use bitcoin::secp256k1;
1313
use bitcoin::secp256k1::key::{PublicKey, SecretKey};
1414

1515
use ln::peers::conduit::Conduit;
16-
use ln::peers::handler::{SocketDescriptor, ITransport, PeerHandleError, MessageQueuer};
17-
use ln::peers::transport::{IPeerHandshake, PayloadQueuer};
16+
use ln::peers::handler::{SocketDescriptor, IOutboundQueue, ITransport, PeerHandleError};
17+
use ln::peers::transport::IPeerHandshake;
1818

1919
use std::rc::Rc;
2020
use std::cell::{RefCell};
@@ -223,8 +223,9 @@ impl Hash for SocketDescriptorMock {
223223
}
224224
}
225225

226-
/// Implement PayloadQueuer for Vec<Vec<u8>> so it can be used as a Spy in tests
227-
impl PayloadQueuer for Vec<Vec<u8>> {
226+
/// Implement IOutboundQueue for Vec<Vec<u8>> so it can be used as a Spy in tests. This only implements
227+
/// a subset of the push methods needed for the tests.
228+
impl IOutboundQueue for Vec<Vec<u8>> {
228229
fn push_back(&mut self, item: Vec<u8>) {
229230
self.push(item)
230231
}
@@ -236,6 +237,18 @@ impl PayloadQueuer for Vec<Vec<u8>> {
236237
fn queue_space(&self) -> usize {
237238
unimplemented!()
238239
}
240+
241+
fn try_flush_one(&mut self, _descriptor: &mut impl SocketDescriptor) -> bool {
242+
unimplemented!()
243+
}
244+
245+
fn unblock(&mut self) {
246+
unimplemented!()
247+
}
248+
249+
fn is_blocked(&self) -> bool {
250+
unimplemented!()
251+
}
239252
}
240253

241254
// Builder for TransportTestStub that allows tests to easily construct the Transport layer they
@@ -296,8 +309,8 @@ impl<'a> ITransport for &'a RefCell<TransportStub> {
296309
unimplemented!()
297310
}
298311

299-
fn process_input(&mut self, input: &[u8], output_buffer: &mut impl PayloadQueuer) -> Result<bool, String> {
300-
self.borrow_mut().process_input(input, output_buffer)
312+
fn process_input(&mut self, input: &[u8], outbound_queue: &mut impl IOutboundQueue) -> Result<bool, String> {
313+
self.borrow_mut().process_input(input, outbound_queue)
301314
}
302315

303316
fn is_connected(&self) -> bool {
@@ -308,14 +321,12 @@ impl<'a> ITransport for &'a RefCell<TransportStub> {
308321
self.borrow().get_their_node_id()
309322
}
310323

311-
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
312-
self.borrow_mut().drain_messages(logger)
324+
fn enqueue_message<M: Encode + Writeable, L: Deref>(&mut self, message: &M, outbound_queue: &mut impl IOutboundQueue, logger: L) where L::Target: Logger {
325+
self.borrow_mut().enqueue_message(message, outbound_queue, logger)
313326
}
314-
}
315327

316-
impl<'a> MessageQueuer for &'a RefCell<TransportStub> {
317-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger {
318-
self.borrow_mut().enqueue_message(message, output_buffer, logger)
328+
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
329+
self.borrow_mut().drain_messages(logger)
319330
}
320331
}
321332

@@ -345,7 +356,7 @@ impl ITransport for TransportStub {
345356
unimplemented!()
346357
}
347358

348-
fn process_input(&mut self, _input: &[u8], _output_buffer: &mut impl PayloadQueuer) -> Result<bool, String> {
359+
fn process_input(&mut self, _input: &[u8], _outbound_queue: &mut impl IOutboundQueue) -> Result<bool, String> {
349360
if self.process_returns_error {
350361
Err("Oh no!".to_string())
351362
} else {
@@ -369,16 +380,14 @@ impl ITransport for TransportStub {
369380
self.their_node_id.unwrap()
370381
}
371382

372-
fn drain_messages<L: Deref>(&mut self, _logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
373-
Ok(self.messages.drain(..).collect())
374-
}
375-
}
376-
377-
impl MessageQueuer for TransportStub {
378-
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, _logger: L)
383+
fn enqueue_message<M: Encode + Writeable, L: Deref>(&mut self, message: &M, outbound_queue: &mut impl IOutboundQueue, _logger: L)
379384
where L::Target: Logger {
380385
let mut buffer = VecWriter(Vec::new());
381386
wire::write(message, &mut buffer).unwrap();
382-
output_buffer.push_back(buffer.0);
387+
outbound_queue.push_back(buffer.0);
388+
}
389+
390+
fn drain_messages<L: Deref>(&mut self, _logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
391+
Ok(self.messages.drain(..).collect())
383392
}
384-
}
393+
}

0 commit comments

Comments
 (0)