@@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
23
23
use super :: utils;
24
24
use util:: events:: OnionMessageProvider ;
25
25
use util:: logger:: Logger ;
26
+ use util:: ser:: Writeable ;
26
27
27
28
use core:: ops:: Deref ;
28
29
use sync:: { Arc , Mutex } ;
@@ -124,6 +125,8 @@ pub enum SendError {
124
125
TooFewBlindedHops ,
125
126
/// Our next-hop peer was offline or does not support onion message forwarding.
126
127
InvalidFirstHop ,
128
+ /// Our next-hop peer's buffer was full or our total outbound buffer was full.
129
+ BufferFull ,
127
130
}
128
131
129
132
impl < Signer : Sign , K : Deref , L : Deref > OnionMessenger < Signer , K , L >
@@ -171,6 +174,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
171
174
packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
172
175
173
176
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
177
+ if outbound_buffer_full ( & introduction_node_id, & pending_per_peer_msgs) { return Err ( SendError :: BufferFull ) }
174
178
match pending_per_peer_msgs. entry ( introduction_node_id) {
175
179
hash_map:: Entry :: Vacant ( _) => Err ( SendError :: InvalidFirstHop ) ,
176
180
hash_map:: Entry :: Occupied ( mut e) => {
@@ -193,6 +197,29 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
193
197
}
194
198
}
195
199
200
+ fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , VecDeque < msgs:: OnionMessage > > ) -> bool {
201
+ const MAX_TOTAL_BUFFER_SIZE : usize = ( 1 << 20 ) * 128 ;
202
+ const MAX_PER_PEER_BUFFER_SIZE : usize = ( 1 << 10 ) * 256 ;
203
+ let mut total_buffered_bytes = 0 ;
204
+ let mut peer_buffered_bytes = 0 ;
205
+ for ( pk, peer_buf) in buffer {
206
+ for om in peer_buf {
207
+ let om_len = om. serialized_length ( ) ;
208
+ if pk == peer_node_id {
209
+ peer_buffered_bytes += om_len;
210
+ }
211
+ total_buffered_bytes += om_len;
212
+
213
+ if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
214
+ peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
215
+ {
216
+ return true
217
+ }
218
+ }
219
+ }
220
+ false
221
+ }
222
+
196
223
impl < Signer : Sign , K : Deref , L : Deref > OnionMessageHandler for OnionMessenger < Signer , K , L >
197
224
where K :: Target : KeysInterface < Signer = Signer > ,
198
225
L :: Target : Logger ,
@@ -279,6 +306,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
279
306
} ;
280
307
281
308
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
309
+ if outbound_buffer_full ( & next_node_id, & pending_per_peer_msgs) {
310
+ log_trace ! ( self . logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full" , next_node_id) ;
311
+ return
312
+ }
282
313
283
314
#[ cfg( fuzzing) ]
284
315
pending_per_peer_msgs. entry ( next_node_id) . or_insert_with ( VecDeque :: new) ;
0 commit comments