@@ -401,10 +401,10 @@ impl Peer {
401
401
self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
402
402
}
403
403
404
- /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
405
- /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
406
- /// been drained.
407
- fn should_buffer_gossip_backfill ( & self ) -> bool {
404
+ /// Determines if we should push additional messages onto a peer's outbound buffer for backfilling
405
+ /// onion messages and gossip data to the peer. This is checked every time the peer's buffer may
406
+ /// have been drained.
407
+ fn should_buffer_message_backfill ( & self ) -> bool {
408
408
self . pending_outbound_buffer . is_empty ( ) &&
409
409
self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
410
410
}
@@ -755,44 +755,54 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
755
755
756
756
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
757
757
while !peer. awaiting_write_event {
758
- if peer. should_buffer_gossip_backfill ( ) {
759
- match peer. sync_status {
760
- InitSyncTracker :: NoSyncRequested => { } ,
761
- InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
762
- if let Some ( ( announce, update_a_option, update_b_option) ) =
763
- self . message_handler . route_handler . get_next_channel_announcement ( c)
764
- {
765
- self . enqueue_message ( peer, & announce) ;
766
- if let Some ( update_a) = update_a_option {
767
- self . enqueue_message ( peer, & update_a) ;
758
+ if peer. should_buffer_message_backfill ( ) {
759
+ let next_onion_message_opt = if let Some ( peer_node_id) = peer. their_node_id {
760
+ self . message_handler . onion_message_handler . next_onion_message_for_peer ( peer_node_id)
761
+ } else { None } ;
762
+
763
+ // For now, let onion messages starve gossip.
764
+ if let Some ( next_onion_message) = next_onion_message_opt {
765
+ self . enqueue_message ( peer, & next_onion_message) ;
766
+ } else {
767
+ match peer. sync_status {
768
+ InitSyncTracker :: NoSyncRequested => { } ,
769
+ InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
770
+ if let Some ( ( announce, update_a_option, update_b_option) ) =
771
+ self . message_handler . route_handler . get_next_channel_announcement ( c)
772
+ {
773
+ self . enqueue_message ( peer, & announce) ;
774
+ if let Some ( update_a) = update_a_option {
775
+ self . enqueue_message ( peer, & update_a) ;
776
+ }
777
+ if let Some ( update_b) = update_b_option {
778
+ self . enqueue_message ( peer, & update_b) ;
779
+ }
780
+ peer. sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
781
+ } else {
782
+ peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
768
783
}
769
- if let Some ( update_b) = update_b_option {
770
- self . enqueue_message ( peer, & update_b) ;
784
+ } ,
785
+ InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
786
+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( None ) {
787
+ self . enqueue_message ( peer, & msg) ;
788
+ peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
789
+ } else {
790
+ peer. sync_status = InitSyncTracker :: NoSyncRequested ;
771
791
}
772
- peer. sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
773
- } else {
774
- peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
775
- }
776
- } ,
777
- InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
778
- if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( None ) {
779
- self . enqueue_message ( peer, & msg) ;
780
- peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
781
- } else {
782
- peer. sync_status = InitSyncTracker :: NoSyncRequested ;
783
- }
784
- } ,
785
- InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
786
- InitSyncTracker :: NodesSyncing ( key) => {
787
- if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( Some ( & key) ) {
788
- self . enqueue_message ( peer, & msg) ;
789
- peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
790
- } else {
791
- peer. sync_status = InitSyncTracker :: NoSyncRequested ;
792
- }
793
- } ,
792
+ } ,
793
+ InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
794
+ InitSyncTracker :: NodesSyncing ( key) => {
795
+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( Some ( & key) ) {
796
+ self . enqueue_message ( peer, & msg) ;
797
+ peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
798
+ } else {
799
+ peer. sync_status = InitSyncTracker :: NoSyncRequested ;
800
+ }
801
+ } ,
802
+ }
794
803
}
795
804
}
805
+
796
806
if peer. msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
797
807
self . maybe_send_extra_ping ( peer) ;
798
808
}
0 commit comments