Skip to content

Commit 3a4a6eb

Browse files
Limit OnionMessenger outbound buffer size
Drop OMs if they push us over the max OnionMessenger outbound buffer size
1 parent 6286ec2 commit 3a4a6eb

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

lightning/src/onion_message/functional_tests.rs

+10
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,13 @@ fn reply_path() {
170170
"lightning::onion_message::messenger".to_string(),
171171
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
172172
}
173+
174+
#[test]
175+
fn peer_buffer_full() {
176+
let nodes = create_nodes(2);
177+
for _ in 0..188 {
178+
nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
179+
}
180+
let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
181+
assert_eq!(err, SendError::BufferFull);
182+
}

lightning/src/onion_message/messenger.rs

+31
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
2323
use super::utils;
2424
use util::events::OnionMessageProvider;
2525
use util::logger::Logger;
26+
use util::ser::Writeable;
2627

2728
use core::ops::Deref;
2829
use sync::{Arc, Mutex};
@@ -122,6 +123,8 @@ pub enum SendError {
122123
TooFewBlindedHops,
123124
/// Our next-hop peer was offline or does not support onion message forwarding.
124125
InvalidFirstHop,
126+
/// Our next-hop peer's buffer was full or our total outbound buffer was full.
127+
BufferFull,
125128
}
126129

127130
impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
@@ -169,6 +172,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
169172
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
170173

171174
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
175+
if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
172176
match pending_per_peer_msgs.entry(introduction_node_id) {
173177
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
174178
hash_map::Entry::Occupied(mut e) => {
@@ -192,6 +196,29 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
192196
}
193197
}
194198

199+
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
200+
const MAX_TOTAL_BUFFER_SIZE: usize = (2 << 20) * 128;
201+
const MAX_PER_PEER_BUFFER_SIZE: usize = (2 << 10) * 256;
202+
let mut total_buffered_bytes = 0;
203+
let mut peer_buffered_bytes = 0;
204+
for (pk, peer_buf) in buffer {
205+
for om in peer_buf {
206+
let om_len = om.serialized_length();
207+
if pk == peer_node_id {
208+
peer_buffered_bytes += om_len;
209+
}
210+
total_buffered_bytes += om_len;
211+
212+
if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
213+
peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
214+
{
215+
return true
216+
}
217+
}
218+
}
219+
false
220+
}
221+
195222
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
196223
where K::Target: KeysInterface<Signer = Signer>,
197224
L::Target: Logger,
@@ -278,6 +305,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
278305
};
279306

280307
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
308+
if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
309+
log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
310+
return
311+
}
281312

282313
#[cfg(fuzzing)]
283314
pending_per_peer_msgs.entry(next_node_id).or_insert_with(|| VecDeque::new());

0 commit comments

Comments
 (0)