@@ -123,8 +123,8 @@ pub enum SendError {
123
123
TooFewBlindedHops ,
124
124
/// Our next-hop peer was offline or does not support onion message forwarding.
125
125
InvalidFirstHop ,
126
- /// Our next-hop peer's buffer was full.
127
- PeerBufferFull ,
126
+ /// Our next-hop peer's buffer was full or our total outbound buffer was full .
127
+ BufferFull ,
128
128
}
129
129
130
130
impl < Signer : Sign , K : Deref , L : Deref > OnionMessenger < Signer , K , L >
@@ -172,10 +172,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
172
172
packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
173
173
174
174
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
175
+ if outbound_buffer_full ( & introduction_node_id, & pending_per_peer_msgs) { return Err ( SendError :: BufferFull ) }
175
176
match pending_per_peer_msgs. entry ( introduction_node_id) {
176
177
hash_map:: Entry :: Vacant ( _) => Err ( SendError :: InvalidFirstHop ) ,
177
178
hash_map:: Entry :: Occupied ( mut e) => {
178
- if peer_buffer_full ( e. get ( ) ) { return Err ( SendError :: PeerBufferFull ) }
179
179
e. get_mut ( ) . push_back ( msgs:: OnionMessage { blinding_point, onion_routing_packet } ) ;
180
180
Ok ( ( ) )
181
181
}
@@ -196,13 +196,24 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
196
196
}
197
197
}
198
198
199
- fn peer_buffer_full ( peer_buf : & VecDeque < msgs:: OnionMessage > ) -> bool {
200
- const MAX_BUFFER_SIZE : usize = 1024 * 256 ;
201
- let mut buffered_bytes = 0 ;
202
- for om in peer_buf {
203
- buffered_bytes += om. serialized_length ( ) ;
204
- if buffered_bytes >= MAX_BUFFER_SIZE {
205
- return true
199
+ fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , VecDeque < msgs:: OnionMessage > > ) -> bool {
200
+ const MAX_TOTAL_BUFFER_SIZE : usize = ( 2 << 20 ) * 128 ;
201
+ const MAX_PER_PEER_BUFFER_SIZE : usize = ( 2 << 10 ) * 256 ;
202
+ let mut total_buffered_bytes = 0 ;
203
+ let mut peer_buffered_bytes = 0 ;
204
+ for ( pk, peer_buf) in buffer {
205
+ for om in peer_buf {
206
+ let om_len = om. serialized_length ( ) ;
207
+ if pk == peer_node_id {
208
+ peer_buffered_bytes += om_len;
209
+ }
210
+ total_buffered_bytes += om_len;
211
+
212
+ if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
213
+ peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
214
+ {
215
+ return true
216
+ }
206
217
}
207
218
}
208
219
false
@@ -294,6 +305,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
294
305
} ;
295
306
296
307
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
308
+ if outbound_buffer_full ( & next_node_id, & pending_per_peer_msgs) {
309
+ log_trace ! ( self . logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full" , next_node_id) ;
310
+ return
311
+ }
297
312
298
313
#[ cfg( fuzzing) ]
299
314
pending_per_peer_msgs. entry ( next_node_id) . or_insert_with ( || VecDeque :: new ( ) ) ;
0 commit comments