@@ -234,6 +234,13 @@ enum InitSyncTracker{
234
234
NodesSyncing ( PublicKey ) ,
235
235
}
236
236
237
+ /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
238
+ /// we manage to send messages until we reach this limit.
239
+ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 10 ;
240
+ /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
241
+ /// the peer.
242
+ const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = 20 ;
243
+
237
244
struct Peer {
238
245
channel_encryptor : PeerChannelEncryptor ,
239
246
their_node_id : Option < PublicKey > ,
@@ -532,13 +539,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
532
539
}
533
540
}
534
541
}
535
- const MSG_BUFF_SIZE : usize = 10 ;
536
542
while !peer. awaiting_write_event {
537
- if peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE {
543
+ if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
538
544
match peer. sync_status {
539
545
InitSyncTracker :: NoSyncRequested => { } ,
540
546
InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
541
- let steps = ( ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
547
+ let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
542
548
let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c, steps) ;
543
549
for & ( ref announce, ref update_a_option, ref update_b_option) in all_messages. iter ( ) {
544
550
encode_and_send_msg ! ( announce) ;
@@ -555,7 +561,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
555
561
}
556
562
} ,
557
563
InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
558
- let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
564
+ let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
559
565
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
560
566
for msg in all_messages. iter ( ) {
561
567
encode_and_send_msg ! ( msg) ;
@@ -567,7 +573,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
567
573
} ,
568
574
InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
569
575
InitSyncTracker :: NodesSyncing ( key) => {
570
- let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
576
+ let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
571
577
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
572
578
for msg in all_messages. iter ( ) {
573
579
encode_and_send_msg ! ( msg) ;
@@ -586,7 +592,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
586
592
Some ( buff) => buff,
587
593
} ;
588
594
589
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE ;
595
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
590
596
let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
591
597
let data_sent = descriptor. send_data ( pending, should_be_reading) ;
592
598
peer. pending_outbound_buffer_first_msg_offset += data_sent;
@@ -815,7 +821,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
815
821
}
816
822
}
817
823
818
- peer. pending_outbound_buffer . len ( ) > 10 // pause_read
824
+ peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read
819
825
}
820
826
} ;
821
827
@@ -1028,6 +1034,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1028
1034
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1029
1035
continue
1030
1036
}
1037
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1038
+ continue ;
1039
+ }
1031
1040
if peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id_1 ) ||
1032
1041
peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id_2 ) {
1033
1042
continue ;
@@ -1047,6 +1056,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1047
1056
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1048
1057
continue
1049
1058
}
1059
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1060
+ continue ;
1061
+ }
1050
1062
if peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id ) {
1051
1063
continue ;
1052
1064
}
@@ -1065,6 +1077,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1065
1077
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1066
1078
continue
1067
1079
}
1080
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1081
+ continue ;
1082
+ }
1068
1083
if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1069
1084
continue ;
1070
1085
}
0 commit comments