@@ -368,11 +368,6 @@ impl PendingChecks {
368
368
if latest_announce. is_none ( ) ||
369
369
latest_announce. as_ref ( ) . unwrap ( ) . timestamp ( ) < msg. timestamp
370
370
{
371
- // If the messages we got has a higher timestamp, just blindly
372
- // assume the signatures on the new message are correct and drop
373
- // the old message. This may cause us to end up dropping valid
374
- // `node_announcement`s if a peer is malicious, but we should get
375
- // the correct ones when the node updates them.
376
371
* latest_announce = Some (
377
372
if let Some ( msg) = full_msg { NodeAnnouncement :: Full ( msg. clone ( ) ) }
378
373
else { NodeAnnouncement :: Unsigned ( msg. clone ( ) ) } ) ;
@@ -543,7 +538,7 @@ impl PendingChecks {
543
538
let mut pending_checks = self . internal . lock ( ) . unwrap ( ) ;
544
539
if pending_checks. channels . len ( ) > Self :: MAX_PENDING_LOOKUPS {
545
540
// If we have many channel checks pending, ensure we don't have any dangling checks
546
- // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture `
541
+ // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture `
547
542
// instead) before we commit to applying backpressure.
548
543
pending_checks. channels . retain ( |_, chan| {
549
544
Weak :: upgrade ( & chan) . is_some ( )
@@ -558,3 +553,309 @@ impl PendingChecks {
558
553
}
559
554
}
560
555
}
556
+
557
+ #[ cfg( test) ]
558
+ mod tests {
559
+ use super :: * ;
560
+ use crate :: routing:: gossip:: tests:: * ;
561
+ use crate :: util:: test_utils:: { TestChainSource , TestLogger } ;
562
+ use crate :: ln:: msgs;
563
+
564
+ use bitcoin:: blockdata:: constants:: genesis_block;
565
+ use bitcoin:: secp256k1:: { Secp256k1 , SecretKey } ;
566
+
567
+ use core:: sync:: atomic:: Ordering ;
568
+
569
+ fn get_network ( ) -> ( TestChainSource , NetworkGraph < Box < TestLogger > > ) {
570
+ let logger = Box :: new ( TestLogger :: new ( ) ) ;
571
+ let genesis_hash = genesis_block ( bitcoin:: Network :: Testnet ) . header . block_hash ( ) ;
572
+ let chain_source = TestChainSource :: new ( bitcoin:: Network :: Testnet ) ;
573
+ let network_graph = NetworkGraph :: new ( genesis_hash, logger) ;
574
+
575
+ ( chain_source, network_graph)
576
+ }
577
+
578
+ fn get_test_objects ( ) -> ( msgs:: ChannelAnnouncement , TestChainSource ,
579
+ NetworkGraph < Box < TestLogger > > , bitcoin:: Script , msgs:: NodeAnnouncement ,
580
+ msgs:: NodeAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate , msgs:: ChannelUpdate )
581
+ {
582
+ let secp_ctx = Secp256k1 :: new ( ) ;
583
+
584
+ let ( chain_source, network_graph) = get_network ( ) ;
585
+
586
+ let good_script = get_channel_script ( & secp_ctx) ;
587
+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
588
+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
589
+ let valid_announcement = get_signed_channel_announcement ( |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
590
+
591
+ let node_a_announce = get_signed_node_announcement ( |_| { } , node_1_privkey, & secp_ctx) ;
592
+ let node_b_announce = get_signed_node_announcement ( |_| { } , node_2_privkey, & secp_ctx) ;
593
+
594
+ // Note that we have to set the "direction" flag correctly on both messages
595
+ let chan_update_a = get_signed_channel_update ( |msg| msg. flags = 0 , node_1_privkey, & secp_ctx) ;
596
+ let chan_update_b = get_signed_channel_update ( |msg| msg. flags = 1 , node_2_privkey, & secp_ctx) ;
597
+ let chan_update_c = get_signed_channel_update ( |msg| {
598
+ msg. flags = 1 ; msg. timestamp += 1 ; } , node_2_privkey, & secp_ctx) ;
599
+
600
+ ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
601
+ node_b_announce, chan_update_a, chan_update_b, chan_update_c)
602
+ }
603
+
604
+ #[ test]
605
+ fn test_fast_async_lookup ( ) {
606
+ // Check that async lookups which resolve quicker than the future is returned to the
607
+ // `get_utxo` call can read it still resolve properly.
608
+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
609
+
610
+ let future = UtxoFuture :: new ( ) ;
611
+ future. resolve_without_forwarding ( & network_graph,
612
+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
613
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
614
+
615
+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap ( ) ;
616
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_some( ) ) ;
617
+ }
618
+
619
+ #[ test]
620
+ fn test_async_lookup ( ) {
621
+ // Test a simple async lookup
622
+ let ( valid_announcement, chain_source, network_graph, good_script,
623
+ node_a_announce, node_b_announce, ..) = get_test_objects ( ) ;
624
+
625
+ let future = UtxoFuture :: new ( ) ;
626
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
627
+
628
+ assert_eq ! (
629
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
630
+ "Channel being checked async" ) ;
631
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
632
+
633
+ future. resolve_without_forwarding ( & network_graph,
634
+ Ok ( TxOut { value : 0 , script_pubkey : good_script } ) ) ;
635
+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
636
+ network_graph. read_only ( ) . channels ( ) . get ( & valid_announcement. contents . short_channel_id ) . unwrap ( ) ;
637
+
638
+ assert ! ( network_graph. read_only( ) . nodes( ) . get( & valid_announcement. contents. node_id_1)
639
+ . unwrap( ) . announcement_info. is_none( ) ) ;
640
+
641
+ network_graph. update_node_from_announcement ( & node_a_announce) . unwrap ( ) ;
642
+ network_graph. update_node_from_announcement ( & node_b_announce) . unwrap ( ) ;
643
+
644
+ assert ! ( network_graph. read_only( ) . nodes( ) . get( & valid_announcement. contents. node_id_1)
645
+ . unwrap( ) . announcement_info. is_some( ) ) ;
646
+ }
647
+
648
+ #[ test]
649
+ fn test_invalid_async_lookup ( ) {
650
+ // Test an async lookup which returns an incorrect script
651
+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
652
+
653
+ let future = UtxoFuture :: new ( ) ;
654
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
655
+
656
+ assert_eq ! (
657
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
658
+ "Channel being checked async" ) ;
659
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
660
+
661
+ future. resolve_without_forwarding ( & network_graph,
662
+ Ok ( TxOut { value : 1_000_000 , script_pubkey : bitcoin:: Script :: new ( ) } ) ) ;
663
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
664
+ }
665
+
666
+ #[ test]
667
+ fn test_failing_async_lookup ( ) {
668
+ // Test an async lookup which returns an error
669
+ let ( valid_announcement, chain_source, network_graph, ..) = get_test_objects ( ) ;
670
+
671
+ let future = UtxoFuture :: new ( ) ;
672
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
673
+
674
+ assert_eq ! (
675
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
676
+ "Channel being checked async" ) ;
677
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
678
+
679
+ future. resolve_without_forwarding ( & network_graph, Err ( UtxoLookupError :: UnknownTx ) ) ;
680
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
681
+ }
682
+
683
+ #[ test]
684
+ fn test_updates_async_lookup ( ) {
685
+ // Test async lookups will process pending channel_update/node_announcements once they
686
+ // complete.
687
+ let ( valid_announcement, chain_source, network_graph, good_script, node_a_announce,
688
+ node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects ( ) ;
689
+
690
+ let future = UtxoFuture :: new ( ) ;
691
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
692
+
693
+ assert_eq ! (
694
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
695
+ "Channel being checked async" ) ;
696
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
697
+
698
+ assert_eq ! (
699
+ network_graph. update_node_from_announcement( & node_a_announce) . unwrap_err( ) . err,
700
+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
701
+ assert_eq ! (
702
+ network_graph. update_node_from_announcement( & node_b_announce) . unwrap_err( ) . err,
703
+ "Awaiting channel_announcement validation to accept node_announcement" ) ;
704
+
705
+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
706
+ "Awaiting channel_announcement validation to accept channel_update" ) ;
707
+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
708
+ "Awaiting channel_announcement validation to accept channel_update" ) ;
709
+
710
+ future. resolve_without_forwarding ( & network_graph,
711
+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
712
+
713
+ assert ! ( network_graph. read_only( ) . channels( )
714
+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . one_to_two. is_some( ) ) ;
715
+ assert ! ( network_graph. read_only( ) . channels( )
716
+ . get( & valid_announcement. contents. short_channel_id) . unwrap( ) . two_to_one. is_some( ) ) ;
717
+
718
+ assert ! ( network_graph. read_only( ) . nodes( ) . get( & valid_announcement. contents. node_id_1)
719
+ . unwrap( ) . announcement_info. is_some( ) ) ;
720
+ assert ! ( network_graph. read_only( ) . nodes( ) . get( & valid_announcement. contents. node_id_2)
721
+ . unwrap( ) . announcement_info. is_some( ) ) ;
722
+ }
723
+
724
+ #[ test]
725
+ fn test_latest_update_async_lookup ( ) {
726
+ // Test async lookups will process the latest channel_update if two are received while
727
+ // awaiting an async UTXO lookup.
728
+ let ( valid_announcement, chain_source, network_graph, good_script, _,
729
+ _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects ( ) ;
730
+
731
+ let future = UtxoFuture :: new ( ) ;
732
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
733
+
734
+ assert_eq ! (
735
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
736
+ "Channel being checked async" ) ;
737
+ assert ! ( network_graph. read_only( ) . channels( ) . get( & valid_announcement. contents. short_channel_id) . is_none( ) ) ;
738
+
739
+ assert_eq ! ( network_graph. update_channel( & chan_update_a) . unwrap_err( ) . err,
740
+ "Awaiting channel_announcement validation to accept channel_update" ) ;
741
+ assert_eq ! ( network_graph. update_channel( & chan_update_b) . unwrap_err( ) . err,
742
+ "Awaiting channel_announcement validation to accept channel_update" ) ;
743
+ assert_eq ! ( network_graph. update_channel( & chan_update_c) . unwrap_err( ) . err,
744
+ "Awaiting channel_announcement validation to accept channel_update" ) ;
745
+
746
+ future. resolve_without_forwarding ( & network_graph,
747
+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
748
+
749
+ assert_eq ! ( chan_update_a. contents. timestamp, chan_update_b. contents. timestamp) ;
750
+ assert ! ( network_graph. read_only( ) . channels( )
751
+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
752
+ . one_to_two. as_ref( ) . unwrap( ) . last_update !=
753
+ network_graph. read_only( ) . channels( )
754
+ . get( & valid_announcement. contents. short_channel_id) . as_ref( ) . unwrap( )
755
+ . two_to_one. as_ref( ) . unwrap( ) . last_update) ;
756
+ }
757
+
758
+ #[ test]
759
+ fn test_no_double_lookups ( ) {
760
+ // Test that a pending async lookup will prevent a second async lookup from flying, but
761
+ // only if the channel_announcement message is identical.
762
+ let ( valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects ( ) ;
763
+
764
+ let future = UtxoFuture :: new ( ) ;
765
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
766
+
767
+ assert_eq ! (
768
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
769
+ "Channel being checked async" ) ;
770
+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
771
+
772
+ // If we make a second request with the same message, the call count doesn't increase...
773
+ let future_b = UtxoFuture :: new ( ) ;
774
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future_b. clone ( ) ) ;
775
+ assert_eq ! (
776
+ network_graph. update_channel_from_announcement( & valid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
777
+ "Channel announcement is already being checked" ) ;
778
+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 1 ) ;
779
+
780
+ // But if we make a third request with a tweaked message, we should get a second call
781
+ // against our new future...
782
+ let secp_ctx = Secp256k1 :: new ( ) ;
783
+ let replacement_pk_1 = & SecretKey :: from_slice ( & [ 99 ; 32 ] ) . unwrap ( ) ;
784
+ let replacement_pk_2 = & SecretKey :: from_slice ( & [ 98 ; 32 ] ) . unwrap ( ) ;
785
+ let invalid_announcement = get_signed_channel_announcement ( |_| { } , replacement_pk_1, replacement_pk_2, & secp_ctx) ;
786
+ assert_eq ! (
787
+ network_graph. update_channel_from_announcement( & invalid_announcement, & Some ( & chain_source) ) . unwrap_err( ) . err,
788
+ "Channel being checked async" ) ;
789
+ assert_eq ! ( chain_source. get_utxo_call_count. load( Ordering :: Relaxed ) , 2 ) ;
790
+
791
+ // Still, if we resolve the original future, the original channel will be accepted.
792
+ future. resolve_without_forwarding ( & network_graph,
793
+ Ok ( TxOut { value : 1_000_000 , script_pubkey : good_script } ) ) ;
794
+ assert ! ( !network_graph. read_only( ) . channels( )
795
+ . get( & valid_announcement. contents. short_channel_id) . unwrap( )
796
+ . announcement_message. as_ref( ) . unwrap( )
797
+ . contents. features. supports_unknown_test_feature( ) ) ;
798
+ }
799
+
800
+ #[ test]
801
+ fn test_checks_backpressure ( ) {
802
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
803
+ // returns false once they complete.
804
+ let secp_ctx = Secp256k1 :: new ( ) ;
805
+ let ( chain_source, network_graph) = get_network ( ) ;
806
+
807
+ // We cheat and use a single future for all the lookups to complete them all at once.
808
+ let future = UtxoFuture :: new ( ) ;
809
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( future. clone ( ) ) ;
810
+
811
+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
812
+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
813
+
814
+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
815
+ let valid_announcement = get_signed_channel_announcement (
816
+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
817
+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
818
+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
819
+ }
820
+
821
+ let valid_announcement = get_signed_channel_announcement (
822
+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
823
+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
824
+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
825
+
826
+ // Once the future completes the "too many checks" flag should reset.
827
+ future. resolve_without_forwarding ( & network_graph, Err ( UtxoLookupError :: UnknownTx ) ) ;
828
+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
829
+ }
830
+
831
+ #[ test]
832
+ fn test_checks_backpressure_drop ( ) {
833
+ // Test that too_many_checks_pending returns true when there are many checks pending, and
834
+ // returns false if we drop some of the futures without completion.
835
+ let secp_ctx = Secp256k1 :: new ( ) ;
836
+ let ( chain_source, network_graph) = get_network ( ) ;
837
+
838
+ // We cheat and use a single future for all the lookups to complete them all at once.
839
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Async ( UtxoFuture :: new ( ) ) ;
840
+
841
+ let node_1_privkey = & SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
842
+ let node_2_privkey = & SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
843
+
844
+ for i in 0 ..PendingChecks :: MAX_PENDING_LOOKUPS {
845
+ let valid_announcement = get_signed_channel_announcement (
846
+ |msg| msg. short_channel_id += 1 + i as u64 , node_1_privkey, node_2_privkey, & secp_ctx) ;
847
+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
848
+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
849
+ }
850
+
851
+ let valid_announcement = get_signed_channel_announcement (
852
+ |_| { } , node_1_privkey, node_2_privkey, & secp_ctx) ;
853
+ network_graph. update_channel_from_announcement ( & valid_announcement, & Some ( & chain_source) ) . unwrap_err ( ) ;
854
+ assert ! ( network_graph. pending_checks. too_many_checks_pending( ) ) ;
855
+
856
+ // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
857
+ // should reset to false.
858
+ * chain_source. utxo_ret . lock ( ) . unwrap ( ) = UtxoResult :: Sync ( Err ( UtxoLookupError :: UnknownTx ) ) ;
859
+ assert ! ( !network_graph. pending_checks. too_many_checks_pending( ) ) ;
860
+ }
861
+ }
0 commit comments