Skip to content

Commit c090cb2

Browse files
committed
refactor: Abstract SocketDescriptor writing queue
To enable better testing, put the code to enqueue data and flush it to a SocketDescriptor behind an object. This patch starts to implement traits for the various separate interface that each object needs. The idea is the consumer of an interface defines it and the dependency objects implement it. This will be useful when creating test doubles for unit tests and helps make the dependencies of each function more clear. Additional uses of trait-based contracts will be more clear in the next patches that start to add more testing.
1 parent 10dfa52 commit c090cb2

File tree

3 files changed

+432
-36
lines changed

3 files changed

+432
-36
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
2020
use util::logger::Logger;
2121
use routing::network_graph::NetGraphMsgHandler;
2222

23-
use std::collections::{HashMap,hash_map,HashSet,LinkedList};
23+
use std::collections::{HashMap,hash_map,HashSet};
2424
use std::sync::{Arc, Mutex};
2525
use std::sync::atomic::{AtomicUsize, Ordering};
2626
use std::{cmp,error,hash,fmt};
@@ -31,6 +31,9 @@ use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
3131
use bitcoin::hashes::{HashEngine, Hash};
3232
use ln::peers::handshake::PeerHandshake;
3333
use ln::peers::conduit::Conduit;
34+
use ln::peers::outbound_buffer::OutboundBuffer;
35+
36+
const MSG_BUFF_SIZE: usize = 10;
3437

3538
/// Provides references to trait impls which handle different types of messages.
3639
pub struct MessageHandler<CM: Deref, RM: Deref> where
@@ -111,6 +114,64 @@ enum InitSyncTracker{
111114
NodesSyncing(PublicKey),
112115
}
113116

117+
/// Trait representing a container that allows enqueuing of Vec<[u8]>
118+
pub(super) trait Queueable {
119+
/// Enqueue item to the queue
120+
fn push_back(&mut self, item: Vec<u8>);
121+
122+
/// Returns true if the queue is empty
123+
fn is_empty(&self) -> bool;
124+
125+
/// Returns the amount of available space in queue
126+
fn queue_space(&self) -> usize;
127+
}
128+
129+
/// Implement &mut Queueable passthroughs
130+
impl<'a, T> Queueable for &'a mut T where
131+
T: Queueable {
132+
fn push_back(&mut self, item: Vec<u8>) {
133+
T::push_back(self, item)
134+
}
135+
136+
fn is_empty(&self) -> bool {
137+
T::is_empty(self)
138+
}
139+
140+
fn queue_space(&self) -> usize {
141+
T::queue_space(self)
142+
}
143+
}
144+
145+
/// Trait representing a container that can try to flush data through a SocketDescriptor
146+
pub(super) trait Flushable {
147+
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
148+
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
149+
/// has been set. Use unblock() when the SocketDescriptor may have more room.
150+
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
151+
152+
/// Clear the blocked state caused when a previous write failed
153+
fn unblock(&mut self);
154+
155+
/// Check if the container is in a blocked state
156+
fn is_blocked(&self) -> bool;
157+
}
158+
159+
/// Implement &mut Flushable passthroughs
160+
impl<'a, T> Flushable for &'a mut T where
161+
T: Flushable {
162+
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
163+
T::try_flush_one(self, descriptor)
164+
}
165+
166+
fn unblock(&mut self) {
167+
T::unblock(self)
168+
}
169+
170+
fn is_blocked(&self) -> bool {
171+
T::is_blocked(self)
172+
}
173+
}
174+
114175
enum PeerState {
115176
Authenticating(PeerHandshake),
116177
Connected(Conduit),
@@ -130,7 +191,7 @@ impl PeerState {
130191
}
131192
}
132193

133-
fn process_peer_data(&mut self, data: &[u8], mutable_response_buffer: &mut LinkedList<Vec<u8>>) -> PeerDataProcessingDecision {
194+
fn process_peer_data(&mut self, data: &[u8], pending_outbound_buffer: &mut impl Queueable) -> PeerDataProcessingDecision {
134195
let (new_state_opt, decision) = match self {
135196
&mut PeerState::Authenticating(ref mut handshake) => {
136197
match handshake.process_act(data) {
@@ -144,7 +205,7 @@ impl PeerState {
144205

145206
// Any response generated by the handshake sequence is put into the response buffer
146207
if let Some(response_vec) = response_vec_option {
147-
mutable_response_buffer.push_back(response_vec);
208+
pending_outbound_buffer.push_back(response_vec);
148209
}
149210

150211
// if process_act() returns the conduit and remote static public key (node id)
@@ -177,9 +238,7 @@ struct Peer {
177238
their_node_id: Option<PublicKey>,
178239
their_features: Option<InitFeatures>,
179240

180-
pending_outbound_buffer: LinkedList<Vec<u8>>,
181-
pending_outbound_buffer_first_msg_offset: usize,
182-
awaiting_write_event: bool,
241+
pending_outbound_buffer: OutboundBuffer,
183242

184243
sync_status: InitSyncTracker,
185244

@@ -366,9 +425,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
366425
their_node_id: Some(their_node_id.clone()),
367426
their_features: None,
368427

369-
pending_outbound_buffer: LinkedList::new(),
370-
pending_outbound_buffer_first_msg_offset: 0,
371-
awaiting_write_event: false,
428+
pending_outbound_buffer: OutboundBuffer::new(MSG_BUFF_SIZE),
372429

373430
sync_status: InitSyncTracker::NoSyncRequested,
374431

@@ -398,9 +455,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
398455
their_node_id: None,
399456
their_features: None,
400457

401-
pending_outbound_buffer: LinkedList::new(),
402-
pending_outbound_buffer_first_msg_offset: 0,
403-
awaiting_write_event: false,
458+
pending_outbound_buffer: OutboundBuffer::new(MSG_BUFF_SIZE),
404459

405460
sync_status: InitSyncTracker::NoSyncRequested,
406461

@@ -423,13 +478,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
423478
}
424479
}
425480
}
426-
const MSG_BUFF_SIZE: usize = 10;
427-
while !peer.awaiting_write_event {
428-
if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
481+
482+
while !peer.pending_outbound_buffer.is_blocked() {
483+
let queue_space = peer.pending_outbound_buffer.queue_space();
484+
if queue_space > 0 {
429485
match peer.sync_status {
430486
InitSyncTracker::NoSyncRequested => {},
431487
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
432-
let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
488+
let steps = ((queue_space + 2) / 3) as u8;
433489
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
434490
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
435491
encode_and_send_msg!(announce);
@@ -446,7 +502,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
446502
}
447503
},
448504
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
449-
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
505+
let steps = queue_space as u8;
450506
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
451507
for msg in all_messages.iter() {
452508
encode_and_send_msg!(msg);
@@ -458,7 +514,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
458514
},
459515
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
460516
InitSyncTracker::NodesSyncing(key) => {
461-
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
517+
let steps = queue_space as u8;
462518
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
463519
for msg in all_messages.iter() {
464520
encode_and_send_msg!(msg);
@@ -471,23 +527,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
471527
}
472528
}
473529

474-
if {
475-
let next_buff = match peer.pending_outbound_buffer.front() {
476-
None => return,
477-
Some(buff) => buff,
478-
};
479-
480-
let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
481-
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
482-
let data_sent = descriptor.send_data(pending, should_be_reading);
483-
peer.pending_outbound_buffer_first_msg_offset += data_sent;
484-
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
485-
} {
486-
peer.pending_outbound_buffer_first_msg_offset = 0;
487-
peer.pending_outbound_buffer.pop_front();
488-
} else {
489-
peer.awaiting_write_event = true;
530+
// No messages to send
531+
if peer.pending_outbound_buffer.is_empty() {
532+
break;
490533
}
534+
535+
peer.pending_outbound_buffer.try_flush_one(descriptor);
491536
}
492537
}
493538

@@ -506,7 +551,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
506551
match peers.peers.get_mut(descriptor) {
507552
None => panic!("Descriptor for write_event is not already known to PeerManager"),
508553
Some(peer) => {
509-
peer.awaiting_write_event = false;
554+
peer.pending_outbound_buffer.unblock();
510555
self.do_attempt_write_data(descriptor, peer);
511556
}
512557
};
@@ -684,7 +729,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
684729

685730
self.do_attempt_write_data(peer_descriptor, peer);
686731

687-
peer.pending_outbound_buffer.len() > 10 // pause_read
732+
peer.pending_outbound_buffer.queue_space() == 0 // pause_read
688733
}
689734
};
690735

lightning/src/ln/peers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
mod chacha;
77
pub mod handler;
88
mod hkdf5869rfc;
9+
mod outbound_buffer;
910

1011
#[cfg(feature = "fuzztarget")]
1112
pub mod conduit;

0 commit comments

Comments
 (0)