Skip to content

Commit 9ef097c

Browse files
committed
Change OutboundQueue flush api so that it flushes entire contents
The do_attempt_write_data() function would previously only flush one item at a time. This would cause non-optimal batching in cases where the queue was full and the RoutingMessageHandler was only asked for a single item. Instead, change the API to flush the entire contents of the queue.
1 parent 9ec8852 commit 9ef097c

File tree

2 files changed

+60
-48
lines changed

2 files changed

+60
-48
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ pub(super) trait MessageQueuer {
8181
/// Trait representing a container that can try to flush data through a SocketDescriptor. Used by the
8282
/// PeerManager to handle flushing the outbound queue and flow control.
8383
pub(super) trait SocketDescriptorFlusher {
84-
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
85-
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
86-
/// has been set. Use unblock() when the SocketDescriptor may have more room.
87-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
84+
/// Attempts to write all previously enqueued data to the SocketDescriptor. A return of false
85+
/// indicates the underlying SocketDescriptor could not fulfill the send_data() calls and the
86+
/// blocked state has been set. Use unblock() when the SocketDescriptor may have more room.
87+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
8888

8989
/// Clear the blocked state caused when a previous write failed
9090
fn unblock(&mut self);
@@ -652,7 +652,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
652652
break;
653653
}
654654

655-
outbound_queue.try_flush_one(descriptor);
655+
outbound_queue.try_flush(descriptor);
656656
}
657657
}
658658

lightning/src/ln/peers/outbound_queue.rs

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,32 +31,37 @@ impl PayloadQueuer for OutboundQueue {
3131
}
3232

3333
impl SocketDescriptorFlusher for OutboundQueue {
34-
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
34+
fn try_flush(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
3535
// Exit early if a previous full write failed and haven't heard that there may be more
3636
// room available
3737
if self.blocked {
3838
return false;
3939
}
4040

41-
let full_write_succeeded = match self.buffer.front() {
42-
None => true,
43-
Some(next_buff) => {
44-
let should_be_reading = self.buffer.len() < self.soft_limit;
45-
let pending = &next_buff[self.buffer_first_msg_offset..];
46-
let data_sent = descriptor.send_data(pending, should_be_reading);
47-
self.buffer_first_msg_offset += data_sent;
48-
self.buffer_first_msg_offset == next_buff.len()
41+
loop {
42+
if self.buffer.is_empty() {
43+
return true;
4944
}
50-
};
5145

52-
if full_write_succeeded {
53-
self.buffer_first_msg_offset = 0;
54-
self.buffer.pop_front();
55-
} else {
56-
self.blocked = true;
46+
let full_write_succeeded = match self.buffer.front() {
47+
None => true,
48+
Some(next_buff) => {
49+
let should_be_reading = self.buffer.len() < self.soft_limit;
50+
let pending = &next_buff[self.buffer_first_msg_offset..];
51+
let data_sent = descriptor.send_data(pending, should_be_reading);
52+
self.buffer_first_msg_offset += data_sent;
53+
self.buffer_first_msg_offset == next_buff.len()
54+
}
55+
};
56+
57+
if full_write_succeeded {
58+
self.buffer_first_msg_offset = 0;
59+
self.buffer.pop_front();
60+
} else {
61+
self.blocked = true;
62+
return false;
63+
}
5764
}
58-
59-
full_write_succeeded
6065
}
6166

6267
fn unblock(&mut self) {
@@ -87,40 +92,40 @@ mod tests {
8792
use super::*;
8893
use ln::peers::test_util::*;
8994

90-
// Test that a try_flush_one() call with no queued data doesn't write anything
95+
// Test that a try_flush() call with no queued data doesn't write anything
9196
#[test]
9297
fn empty_does_not_write() {
9398
let mut descriptor = SocketDescriptorMock::new();
9499
let mut empty = OutboundQueue::new(10);
95100

96-
assert!(empty.try_flush_one(&mut descriptor));
101+
assert!(empty.try_flush(&mut descriptor));
97102
descriptor.assert_called_with(vec![]);
98103

99104
}
100105

101-
// Test that try_flush_one() sends the push_back
106+
// Test that try_flush() sends the push_back
102107
#[test]
103108
fn push_back_drain() {
104109
let mut descriptor = SocketDescriptorMock::new();
105110
let mut queue = OutboundQueue::new(10);
106111

107112
queue.push_back(vec![1]);
108-
assert!(queue.try_flush_one(&mut descriptor));
113+
assert!(queue.try_flush(&mut descriptor));
109114

110115
descriptor.assert_called_with(vec![(vec![1], true)]);
111116
}
112117

113-
// Test that try_flush_one() sends just first push_back
118+
// Test that try_flush() sends all
114119
#[test]
115120
fn push_back_push_back_drain_drain() {
116121
let mut descriptor = SocketDescriptorMock::new();
117122
let mut queue = OutboundQueue::new(10);
118123

119124
queue.push_back(vec![1]);
120125
queue.push_back(vec![2]);
121-
assert!(queue.try_flush_one(&mut descriptor));
126+
assert!(queue.try_flush(&mut descriptor));
122127

123-
descriptor.assert_called_with(vec![(vec![1], true)]);
128+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2], true)]);
124129
}
125130

126131
// Test that descriptor that can't write all bytes returns valid response
@@ -130,27 +135,40 @@ mod tests {
130135
let mut queue = OutboundQueue::new(10);
131136

132137
queue.push_back(vec![1, 2, 3]);
133-
assert!(!queue.try_flush_one(&mut descriptor));
138+
assert!(!queue.try_flush(&mut descriptor));
134139

135140
descriptor.assert_called_with(vec![(vec![1, 2, 3], true)]);
136141
}
137142

143+
// Test that descriptor that can't write all bytes (in second pushed item) returns valid response
144+
#[test]
145+
fn push_back_drain_partial_multiple_push() {
146+
let mut descriptor = SocketDescriptorMock::with_fixed_size(2);
147+
let mut queue = OutboundQueue::new(10);
148+
149+
queue.push_back(vec![1]);
150+
queue.push_back(vec![2, 3]);
151+
assert!(!queue.try_flush(&mut descriptor));
152+
153+
descriptor.assert_called_with(vec![(vec![1], true), (vec![2, 3], true)]);
154+
}
155+
138156
// Test the bookkeeping for multiple partial writes
139157
#[test]
140-
fn push_back_drain_partial_drain_partial_try_flush_one() {
158+
fn push_back_drain_partial_drain_partial_try_flush() {
141159
let mut descriptor = SocketDescriptorMock::with_fixed_size(1);
142160
let mut queue = OutboundQueue::new(10);
143161

144162
queue.push_back(vec![1, 2, 3]);
145-
assert!(!queue.try_flush_one(&mut descriptor));
163+
assert!(!queue.try_flush(&mut descriptor));
146164

147165
descriptor.make_room(1);
148166
queue.unblock();
149-
assert!(!queue.try_flush_one(&mut descriptor));
167+
assert!(!queue.try_flush(&mut descriptor));
150168

151169
descriptor.make_room(1);
152170
queue.unblock();
153-
assert!(queue.try_flush_one(&mut descriptor));
171+
assert!(queue.try_flush(&mut descriptor));
154172

155173
descriptor.assert_called_with(vec![(vec![1, 2, 3], true), (vec![2, 3], true), (vec![3], true)]);
156174
}
@@ -162,27 +180,27 @@ mod tests {
162180

163181
// Fail write and move to blocked state
164182
queue.push_back(vec![1, 2]);
165-
assert!(!queue.try_flush_one(&mut descriptor));
183+
assert!(!queue.try_flush(&mut descriptor));
166184
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
167185

168186
// Make room but don't signal
169187
descriptor.make_room(1);
170-
assert!(!queue.try_flush_one(&mut descriptor));
188+
assert!(!queue.try_flush(&mut descriptor));
171189
assert!(queue.is_blocked());
172190
descriptor.assert_called_with(vec![(vec![1, 2], true)]);
173191

174192
// Unblock and try again
175193
queue.unblock();
176194

177195
// Partial write will succeed, but still move to blocked
178-
assert!(!queue.try_flush_one(&mut descriptor));
196+
assert!(!queue.try_flush(&mut descriptor));
179197
assert!(queue.is_blocked());
180198
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true)]);
181199

182200
// Make room and signal which will succeed in writing the final piece
183201
descriptor.make_room(1);
184202
queue.unblock();
185-
assert!(queue.try_flush_one(&mut descriptor));
203+
assert!(queue.try_flush(&mut descriptor));
186204
assert!(!queue.is_blocked());
187205
descriptor.assert_called_with(vec![(vec![1, 2], true), (vec![1, 2], true), (vec![2], true)]);
188206
}
@@ -194,7 +212,7 @@ mod tests {
194212
let mut queue = OutboundQueue::new(1);
195213

196214
queue.push_back(vec![1]);
197-
assert!(queue.try_flush_one(&mut descriptor));
215+
assert!(queue.try_flush(&mut descriptor));
198216
descriptor.assert_called_with(vec![(vec![1], false)]);
199217
}
200218

@@ -207,9 +225,7 @@ mod tests {
207225
queue.push_back(vec![1]);
208226
queue.push_back(vec![2]);
209227
queue.push_back(vec![3]);
210-
assert!(queue.try_flush_one(&mut descriptor));
211-
assert!(queue.try_flush_one(&mut descriptor));
212-
assert!(queue.try_flush_one(&mut descriptor));
228+
assert!(queue.try_flush(&mut descriptor));
213229
descriptor.assert_called_with(vec![(vec![1], false), (vec![2], false), (vec![3], true)]);
214230
}
215231

@@ -223,7 +239,7 @@ mod tests {
223239
queue.push_back(vec![1]);
224240
assert!(!queue.is_empty());
225241

226-
assert!(queue.try_flush_one(&mut descriptor));
242+
assert!(queue.try_flush(&mut descriptor));
227243
assert!(queue.is_empty());
228244
}
229245

@@ -244,12 +260,8 @@ mod tests {
244260
queue.push_back(vec![2]);
245261
assert_eq!(queue.queue_space(), 0);
246262

247-
// at soft limit
248-
assert!(queue.try_flush_one(&mut descriptor));
249-
assert_eq!(queue.queue_space(), 0);
250-
251263
// below soft limt
252-
assert!(queue.try_flush_one(&mut descriptor));
264+
assert!(queue.try_flush(&mut descriptor));
253265
assert_eq!(queue.queue_space(), 1);
254266
}
255267
}

0 commit comments

Comments
 (0)