@@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
23
23
use super :: utils;
24
24
use util:: events:: OnionMessageProvider ;
25
25
use util:: logger:: Logger ;
26
+ use util:: ser:: Writeable ;
26
27
27
28
use core:: ops:: Deref ;
28
29
use sync:: { Arc , Mutex } ;
@@ -122,6 +123,8 @@ pub enum SendError {
122
123
TooFewBlindedHops ,
123
124
/// Our next-hop peer was offline or does not support onion message forwarding.
124
125
InvalidFirstHop ,
126
+ /// Our next-hop peer's buffer was full.
127
+ PeerBufferFull ,
125
128
}
126
129
127
130
impl < Signer : Sign , K : Deref , L : Deref > OnionMessenger < Signer , K , L >
@@ -169,6 +172,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
169
172
packet_payloads, packet_keys, prng_seed) . map_err ( |( ) | SendError :: TooBigPacket ) ?;
170
173
171
174
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
175
+ if outbound_buffer_full ( & pending_per_peer_msgs) { return Err ( SendError :: PeerBufferFull ) }
172
176
match pending_per_peer_msgs. entry ( introduction_node_id) {
173
177
hash_map:: Entry :: Vacant ( _) => Err ( SendError :: InvalidFirstHop ) ,
174
178
hash_map:: Entry :: Occupied ( mut e) => {
@@ -177,7 +181,6 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
177
181
}
178
182
}
179
183
}
180
-
181
184
#[ cfg( test) ]
182
185
pub ( super ) fn release_pending_msgs ( & self ) -> HashMap < PublicKey , VecDeque < msgs:: OnionMessage > > {
183
186
let mut pending_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
@@ -192,6 +195,20 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
192
195
}
193
196
}
194
197
198
+ fn outbound_buffer_full ( buffer : & HashMap < PublicKey , VecDeque < msgs:: OnionMessage > > ) -> bool {
199
+ const MAX_BUFFER_SIZE : usize = 262144 ; // 256KiB
200
+ let mut buffered_bytes = 0 ;
201
+ for peer_buf in buffer. values ( ) {
202
+ for om in peer_buf {
203
+ buffered_bytes += om. serialized_length ( ) ;
204
+ if buffered_bytes >= MAX_BUFFER_SIZE {
205
+ return true
206
+ }
207
+ }
208
+ }
209
+ false
210
+ }
211
+
195
212
impl < Signer : Sign , K : Deref , L : Deref > OnionMessageHandler for OnionMessenger < Signer , K , L >
196
213
where K :: Target : KeysInterface < Signer = Signer > ,
197
214
L :: Target : Logger ,
@@ -256,6 +273,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
256
273
} ;
257
274
258
275
let mut pending_per_peer_msgs = self . pending_messages . lock ( ) . unwrap ( ) ;
276
+ if outbound_buffer_full ( & pending_per_peer_msgs) {
277
+ log_trace ! ( self . logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full" , next_node_id) ;
278
+ return
279
+ }
259
280
match pending_per_peer_msgs. entry ( next_node_id) {
260
281
hash_map:: Entry :: Vacant ( _) => {
261
282
log_trace ! ( self . logger, "Dropping forwarded onion message to disconnected peer {:?}" , next_node_id) ;
0 commit comments