Skip to content

Commit 49b8c15

Browse files
Score payment paths from events in BackgroundProcessor
1 parent d2bf407 commit 49b8c15

File tree

1 file changed

+215
-14
lines changed
  • lightning-background-processor/src

1 file changed

+215
-14
lines changed

lightning-background-processor/src/lib.rs

+215-14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
#[cfg(any(test, feature = "std"))]
1717
extern crate core;
1818

19+
#[cfg(not(feature = "std"))]
20+
extern crate alloc;
21+
1922
#[macro_use] extern crate lightning;
2023
extern crate lightning_rapid_gossip_sync;
2124

@@ -28,7 +31,7 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes
2831
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2932
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3033
use lightning::routing::router::Router;
31-
use lightning::routing::scoring::WriteableScore;
34+
use lightning::routing::scoring::{Score, WriteableScore};
3235
use lightning::util::events::{Event, EventHandler, EventsProvider};
3336
use lightning::util::logger::Logger;
3437
use lightning::util::persist::Persister;
@@ -49,6 +52,8 @@ use std::time::Instant;
4952

5053
#[cfg(feature = "futures")]
5154
use futures_util::{select_biased, future::FutureExt, task};
55+
#[cfg(not(feature = "std"))]
56+
use alloc::vec::Vec;
5257

5358
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
5459
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -216,6 +221,34 @@ fn handle_network_graph_update<L: Deref>(
216221
}
217222
}
218223

224+
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225+
scorer: &'a S, event: &Event
226+
) {
227+
match event {
228+
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
229+
let path = path.iter().collect::<Vec<_>>();
230+
let mut score = scorer.lock();
231+
score.payment_path_failed(&path, *scid);
232+
},
233+
Event::PaymentPathSuccessful { path, .. } => {
234+
let path = path.iter().collect::<Vec<_>>();
235+
let mut score = scorer.lock();
236+
score.payment_path_successful(&path);
237+
},
238+
Event::ProbeSuccessful { path, .. } => {
239+
let path = path.iter().collect::<Vec<_>>();
240+
let mut score = scorer.lock();
241+
score.probe_successful(&path);
242+
},
243+
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
244+
let path = path.iter().collect::<Vec<_>>();
245+
let mut score = scorer.lock();
246+
score.probe_failed(&path, *scid);
247+
},
248+
_ => {},
249+
}
250+
}
251+
219252
macro_rules! define_run_body {
220253
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221254
$channel_manager: ident, $process_channel_manager_events: expr,
@@ -387,7 +420,7 @@ pub async fn process_events_async<
387420
UMH: 'static + Deref + Send + Sync,
388421
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
389422
S: 'static + Deref<Target = SC> + Send + Sync,
390-
SC: WriteableScore<'a>,
423+
SC: for<'b> WriteableScore<'b>,
391424
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392425
Sleeper: Fn(Duration) -> SleepFuture
393426
>(
@@ -417,10 +450,14 @@ where
417450
let async_event_handler = |event| {
418451
let network_graph = gossip_sync.network_graph();
419452
let event_handler = &event_handler;
453+
let scorer = &scorer;
420454
async move {
421455
if let Some(network_graph) = network_graph {
422456
handle_network_graph_update(network_graph, &event)
423457
}
458+
if let Some(ref scorer) = scorer {
459+
update_scorer(scorer, &event);
460+
}
424461
event_handler(event).await;
425462
}
426463
};
@@ -516,7 +553,7 @@ impl BackgroundProcessor {
516553
UMH: 'static + Deref + Send + Sync,
517554
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
518555
S: 'static + Deref<Target = SC> + Send + Sync,
519-
SC: WriteableScore<'a>,
556+
SC: for <'b> WriteableScore<'b>,
520557
>(
521558
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522559
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
@@ -547,6 +584,9 @@ impl BackgroundProcessor {
547584
if let Some(network_graph) = network_graph {
548585
handle_network_graph_update(network_graph, &event)
549586
}
587+
if let Some(ref scorer) = scorer {
588+
update_scorer(scorer, &event);
589+
}
550590
event_handler.handle_event(event);
551591
};
552592
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -618,15 +658,16 @@ mod tests {
618658
use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
619659
use lightning::chain::transaction::OutPoint;
620660
use lightning::get_event_msg;
621-
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
661+
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, PaymentId, Retry as ChannelManagerRetry, SimpleArcChannelManager};
662+
use lightning::ln::functional_test_utils::*;
622663
use lightning::ln::features::ChannelFeatures;
623-
use lightning::ln::msgs::{ChannelMessageHandler, Init};
664+
use lightning::ln::msgs::{ChannelMessageHandler, Init, RoutingMessageHandler};
624665
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
625-
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
626-
use lightning::routing::router::DefaultRouter;
666+
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
667+
use lightning::routing::router::{DefaultRouter, PaymentParameters, RouteParameters};
627668
use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
628669
use lightning::util::config::UserConfig;
629-
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
670+
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent, PaymentPurpose};
630671
use lightning::util::ser::Writeable;
631672
use lightning::util::test_utils;
632673
use lightning::util::persist::KVStorePersister;
@@ -782,8 +823,8 @@ mod tests {
782823
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
783824
let best_block = BestBlock::from_genesis(network);
784825
let params = ChainParameters { network, best_block };
785-
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
786-
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
826+
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), test_default_channel_config(), params));
827+
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), None, logger.clone()));
787828
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
788829
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
789830
let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
@@ -803,7 +844,7 @@ mod tests {
803844

804845
macro_rules! open_channel {
805846
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
806-
begin_open_channel!($node_a, $node_b, $channel_value);
847+
begin_open_channel!($node_a, $node_b, $channel_value, None);
807848
let events = $node_a.node.get_and_clear_pending_events();
808849
assert_eq!(events.len(), 1);
809850
let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
@@ -813,8 +854,8 @@ mod tests {
813854
}
814855

815856
macro_rules! begin_open_channel {
816-
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
817-
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
857+
($node_a: expr, $node_b: expr, $channel_value: expr, $override_cfg: expr) => {{
858+
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, $override_cfg).unwrap();
818859
$node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
819860
$node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
820861
}}
@@ -1034,7 +1075,7 @@ mod tests {
10341075
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
10351076

10361077
// Open a channel and check that the FundingGenerationReady event was handled.
1037-
begin_open_channel!(nodes[0], nodes[1], channel_value);
1078+
begin_open_channel!(nodes[0], nodes[1], channel_value, Some(UserConfig::default()));
10381079
let (temporary_channel_id, funding_tx) = receiver
10391080
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
10401081
.expect("FundingGenerationReady not handled within deadline");
@@ -1171,4 +1212,164 @@ mod tests {
11711212
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
11721213
assert!(bg_processor.stop().is_ok());
11731214
}
1215+
1216+
#[test]
1217+
fn test_payment_path_scoring() {
1218+
// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1219+
// that we update the scorer upon a payment path succeeding (note that the channel must be
1220+
// public or else we won't score it).
1221+
let mut nodes = create_nodes(2, "test_payment_path_scoring".to_string());
1222+
let channel_value = 100000;
1223+
let data_dir_0 = nodes[0].persister.get_data_dir();
1224+
let persister_0 = Arc::new(Persister::new(data_dir_0.clone()));
1225+
1226+
// Set up a background event handler for FundingGenerationReady events.
1227+
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1228+
let event_handler_0 = move |event: Event| match event {
1229+
Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1230+
Event::ChannelReady { .. } => {},
1231+
_ => panic!("Unexpected event: {:?}", event),
1232+
};
1233+
1234+
let bg_processor_0 = BackgroundProcessor::start(persister_0, event_handler_0, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1235+
1236+
// Open a channel and check that the FundingGenerationReady event was handled.
1237+
begin_open_channel!(nodes[0], nodes[1], channel_value, None);
1238+
let (temporary_channel_id, funding_tx) = receiver
1239+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1240+
.expect("FundingGenerationReady not handled within deadline");
1241+
end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1242+
1243+
// Confirm the funding transaction.
1244+
confirm_transaction(&mut nodes[0], &funding_tx);
1245+
let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1246+
confirm_transaction(&mut nodes[1], &funding_tx);
1247+
let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1248+
nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1249+
let as_ann_sigs = get_event_msg!(nodes[0], MessageSendEvent::SendAnnouncementSignatures, nodes[1].node.get_our_node_id());
1250+
nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1251+
let bs_ann_sigs = get_event_msg!(nodes[1], MessageSendEvent::SendAnnouncementSignatures, nodes[0].node.get_our_node_id());
1252+
nodes[1].node.handle_announcement_signatures(&nodes[0].node.get_our_node_id(), &as_ann_sigs);
1253+
let events = nodes[1].node.get_and_clear_pending_msg_events();
1254+
assert_eq!(events.len(), 1);
1255+
let (ann, bs_update) = match events[0] {
1256+
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1257+
(msg, update_msg)
1258+
},
1259+
_ => panic!("Unexpected event"),
1260+
};
1261+
let scid = bs_update.contents.short_channel_id;
1262+
nodes[0].node.handle_announcement_signatures(&nodes[1].node.get_our_node_id(), &bs_ann_sigs);
1263+
let events = nodes[0].node.get_and_clear_pending_msg_events();
1264+
assert_eq!(events.len(), 1);
1265+
let as_update = match events[0] {
1266+
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
1267+
assert!(*ann == *msg);
1268+
assert_eq!(update_msg.contents.short_channel_id, ann.contents.short_channel_id);
1269+
assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id);
1270+
update_msg
1271+
},
1272+
_ => panic!("Unexpected event"),
1273+
};
1274+
1275+
for node in nodes.iter() {
1276+
assert!(node.p2p_gossip_sync.handle_channel_announcement(ann).unwrap());
1277+
node.p2p_gossip_sync.handle_channel_update(as_update).unwrap();
1278+
node.p2p_gossip_sync.handle_channel_update(bs_update).unwrap();
1279+
1280+
node.node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
1281+
node.node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
1282+
}
1283+
1284+
assert!(bg_processor_0.stop().is_ok());
1285+
1286+
let (sender_0, receiver_0) = std::sync::mpsc::sync_channel(1);
1287+
let event_handler_0 = move |event: Event| match event {
1288+
Event::ChannelReady { .. } => {},
1289+
Event::PendingHTLCsForwardable { .. } => {},
1290+
Event::PaymentSent { .. } => {},
1291+
Event::PaymentPathSuccessful { .. } => sender_0.send(event).unwrap(),
1292+
_ => panic!("Unexpected event: {:?}", event),
1293+
};
1294+
let persister_0 = Arc::new(Persister::new(data_dir_0));
1295+
let _bg_processor_0 = BackgroundProcessor::start(persister_0, event_handler_0, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1296+
1297+
let amt_msat = 40000;
1298+
let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id())
1299+
.with_features(nodes[1].node.invoice_features());
1300+
let route_params = RouteParameters {
1301+
payment_params,
1302+
final_value_msat: amt_msat,
1303+
final_cltv_expiry_delta: 70,
1304+
};
1305+
let (payment_hash, payment_secret) = nodes[1].node.create_inbound_payment(Some(amt_msat), 60*60, None).unwrap();
1306+
1307+
let (sender_1, receiver_1) = std::sync::mpsc::sync_channel(1);
1308+
let event_handler_1 = move |event: Event| match event {
1309+
Event::ChannelReady { .. } => {},
1310+
Event::PendingHTLCsForwardable { .. } => {},
1311+
Event::PaymentClaimable { .. } => sender_1.send(event).unwrap(),
1312+
Event::PaymentClaimed { .. } => {},
1313+
_ => panic!("Unexpected event: {:?}", event),
1314+
};
1315+
let data_dir_1 = nodes[1].persister.get_data_dir();
1316+
let persister_1 = Arc::new(Persister::new(data_dir_1.clone()));
1317+
let _bg_processor_1 = BackgroundProcessor::start(persister_1, event_handler_1, nodes[1].chain_monitor.clone(), nodes[1].node.clone(), nodes[1].no_gossip_sync(), nodes[1].peer_manager.clone(), nodes[1].logger.clone(), Some(nodes[1].scorer.clone()));
1318+
1319+
nodes[0].node.send_payment_with_retry(payment_hash, &Some(payment_secret), PaymentId(payment_hash.0), route_params, ChannelManagerRetry::Attempts(0)).unwrap();
1320+
1321+
let mut msg_events = nodes[0].node.get_and_clear_pending_msg_events();
1322+
1323+
assert_eq!(msg_events.len(), 1);
1324+
let payment_event = SendEvent::from_event(msg_events.remove(0));
1325+
1326+
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
1327+
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &payment_event.commitment_msg);
1328+
let (bs_first_raa, bs_first_cs) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1329+
1330+
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
1331+
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_first_cs);
1332+
let as_first_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
1333+
1334+
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
1335+
nodes[1].node.process_pending_htlc_forwards();
1336+
1337+
let event = receiver_1
1338+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1339+
.expect("Events not handled within deadline");
1340+
match event {
1341+
Event::PaymentClaimable { purpose: PaymentPurpose::InvoicePayment { payment_preimage: Some(payment_preimage), .. }, .. } => nodes[1].node.claim_funds(payment_preimage),
1342+
Event::ChannelReady { .. } => {},
1343+
_ => panic!("Unexpected event: {:?}", event),
1344+
}
1345+
1346+
nodes[1].node.process_pending_htlc_forwards();
1347+
let update_1 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
1348+
assert!(update_1.update_fulfill_htlcs.len() == 1);
1349+
let fulfill_msg = update_1.update_fulfill_htlcs[0].clone();
1350+
1351+
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &fulfill_msg);
1352+
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &update_1.commitment_signed);
1353+
let (as_first_raa, as_first_cs) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
1354+
1355+
nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &as_first_raa);
1356+
nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &as_first_cs);
1357+
let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
1358+
1359+
// Ensure that nodes[0] has no capacity data for its channel before the PaymentPathSuccess event
1360+
// is processed.
1361+
assert_eq!(nodes[0].scorer.lock().unwrap().estimated_channel_liquidity_range(scid, &NodeId::from_pubkey(&nodes[0].node.get_our_node_id())), None);
1362+
1363+
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
1364+
1365+
let event = receiver_0
1366+
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1367+
.expect("Events not handled within deadline");
1368+
match event {
1369+
Event::PaymentPathSuccessful { .. } => {
1370+
assert_eq!(nodes[0].scorer.lock().unwrap().estimated_channel_liquidity_range(scid, &NodeId::from_pubkey(&nodes[0].node.get_our_node_id())), Some((40000, 10000000)));
1371+
},
1372+
_ => panic!("Unexpected event: {:?}", event),
1373+
}
1374+
}
11741375
}

0 commit comments

Comments
 (0)