Skip to content

Commit b06cfc3

Browse files
committed
Make event handling fallible
Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation).
1 parent e5b7402 commit b06cfc3

File tree

8 files changed

+262
-175
lines changed

8 files changed

+262
-175
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ use core::task;
556556
/// # }
557557
/// # struct EventHandler {}
558558
/// # impl EventHandler {
559-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
559+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ()> { Ok(()) }
560560
/// # }
561561
/// # #[derive(Eq, PartialEq, Clone, Hash)]
562562
/// # struct SocketDescriptor {}
@@ -654,7 +654,7 @@ pub async fn process_events_async<
654654
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
655655
L: 'static + Deref + Send + Sync,
656656
P: 'static + Deref + Send + Sync,
657-
EventHandlerFuture: core::future::Future<Output = ()>,
657+
EventHandlerFuture: core::future::Future<Output = Result<(),()>>,
658658
EventHandler: Fn(Event) -> EventHandlerFuture,
659659
PS: 'static + Deref + Send,
660660
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
@@ -703,12 +703,13 @@ where
703703
if update_scorer(scorer, &event, duration_since_epoch) {
704704
log_trace!(logger, "Persisting scorer after update");
705705
if let Err(e) = persister.persist_scorer(&scorer) {
706-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
706+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
707+
return Err(());
707708
}
708709
}
709710
}
710711
}
711-
event_handler(event).await;
712+
event_handler(event).await
712713
})
713714
};
714715
define_run_body!(
@@ -740,6 +741,26 @@ where
740741
)
741742
}
742743

744+
#[cfg(feature = "futures")]
745+
async fn process_onion_message_handler_events_async<
746+
EventHandlerFuture: core::future::Future<Output = Result<(), ()>>,
747+
EventHandler: Fn(Event) -> EventHandlerFuture,
748+
PM: 'static + Deref + Send + Sync,
749+
>(
750+
peer_manager: &PM, handler: EventHandler
751+
)
752+
where
753+
PM::Target: APeerManager + Send + Sync,
754+
{
755+
let events = core::cell::RefCell::new(Vec::new());
756+
peer_manager.onion_message_handler().process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
757+
758+
for event in events.into_inner() {
759+
// Ignore any errors as onion messages are best effort anyways.
760+
let _ = handler(event).await;
761+
}
762+
}
763+
743764
#[cfg(feature = "std")]
744765
impl BackgroundProcessor {
745766
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -841,7 +862,7 @@ impl BackgroundProcessor {
841862
}
842863
}
843864
}
844-
event_handler.handle_event(event);
865+
event_handler.handle_event(event)
845866
};
846867
define_run_body!(
847868
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -1425,7 +1446,7 @@ mod tests {
14251446
// Initiate the background processors to watch each node.
14261447
let data_dir = nodes[0].kv_store.get_data_dir();
14271448
let persister = Arc::new(Persister::new(data_dir));
1428-
let event_handler = |_: _| {};
1449+
let event_handler = |_: _| { Ok(()) };
14291450
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14301451

14311452
macro_rules! check_persisted_data {
@@ -1493,7 +1514,7 @@ mod tests {
14931514
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
14941515
let data_dir = nodes[0].kv_store.get_data_dir();
14951516
let persister = Arc::new(Persister::new(data_dir));
1496-
let event_handler = |_: _| {};
1517+
let event_handler = |_: _| { Ok(()) };
14971518
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14981519
loop {
14991520
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -1522,7 +1543,7 @@ mod tests {
15221543

15231544
let data_dir = nodes[0].kv_store.get_data_dir();
15241545
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1525-
let event_handler = |_: _| {};
1546+
let event_handler = |_: _| { Ok(()) };
15261547
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15271548
match bg_processor.join() {
15281549
Ok(_) => panic!("Expected error persisting manager"),
@@ -1544,7 +1565,7 @@ mod tests {
15441565
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
15451566

15461567
let bp_future = super::process_events_async(
1547-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1568+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
15481569
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
15491570
Some(nodes[0].scorer.clone()), move |dur: Duration| {
15501571
Box::pin(async move {
@@ -1568,7 +1589,7 @@ mod tests {
15681589
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
15691590
let data_dir = nodes[0].kv_store.get_data_dir();
15701591
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1571-
let event_handler = |_: _| {};
1592+
let event_handler = |_: _| { Ok(()) };
15721593
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15731594

15741595
match bg_processor.stop() {
@@ -1586,7 +1607,7 @@ mod tests {
15861607
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
15871608
let data_dir = nodes[0].kv_store.get_data_dir();
15881609
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1589-
let event_handler = |_: _| {};
1610+
let event_handler = |_: _| { Ok(()) };
15901611
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15911612

15921613
match bg_processor.stop() {
@@ -1608,11 +1629,14 @@ mod tests {
16081629
// Set up a background event handler for FundingGenerationReady events.
16091630
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
16101631
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1611-
let event_handler = move |event: Event| match event {
1612-
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1613-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1614-
Event::ChannelReady { .. } => {},
1615-
_ => panic!("Unexpected event: {:?}", event),
1632+
let event_handler = move |event: Event| {
1633+
match event {
1634+
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1635+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1636+
Event::ChannelReady { .. } => {},
1637+
_ => panic!("Unexpected event: {:?}", event),
1638+
}
1639+
Ok(())
16161640
};
16171641

16181642
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1648,11 +1672,14 @@ mod tests {
16481672

16491673
// Set up a background event handler for SpendableOutputs events.
16501674
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1651-
let event_handler = move |event: Event| match event {
1652-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1653-
Event::ChannelReady { .. } => {},
1654-
Event::ChannelClosed { .. } => {},
1655-
_ => panic!("Unexpected event: {:?}", event),
1675+
let event_handler = move |event: Event| {
1676+
match event {
1677+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1678+
Event::ChannelReady { .. } => {},
1679+
Event::ChannelClosed { .. } => {},
1680+
_ => panic!("Unexpected event: {:?}", event),
1681+
}
1682+
Ok(())
16561683
};
16571684
let persister = Arc::new(Persister::new(data_dir));
16581685
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1766,7 +1793,7 @@ mod tests {
17661793
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
17671794
let data_dir = nodes[0].kv_store.get_data_dir();
17681795
let persister = Arc::new(Persister::new(data_dir));
1769-
let event_handler = |_: _| {};
1796+
let event_handler = |_: _| { Ok(()) };
17701797
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17711798

17721799
loop {
@@ -1839,7 +1866,7 @@ mod tests {
18391866
let data_dir = nodes[0].kv_store.get_data_dir();
18401867
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
18411868

1842-
let event_handler = |_: _| {};
1869+
let event_handler = |_: _| { Ok(()) };
18431870
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
18441871

18451872
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
@@ -1860,7 +1887,7 @@ mod tests {
18601887

18611888
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
18621889
let bp_future = super::process_events_async(
1863-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1890+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
18641891
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
18651892
Some(nodes[0].scorer.clone()), move |dur: Duration| {
18661893
let mut exit_receiver = exit_receiver.clone();
@@ -1987,12 +2014,15 @@ mod tests {
19872014
#[test]
19882015
fn test_payment_path_scoring() {
19892016
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1990-
let event_handler = move |event: Event| match event {
1991-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1992-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1993-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1994-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1995-
_ => panic!("Unexpected event: {:?}", event),
2017+
let event_handler = move |event: Event| {
2018+
match event {
2019+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2020+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2021+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2022+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2023+
_ => panic!("Unexpected event: {:?}", event),
2024+
}
2025+
Ok(())
19962026
};
19972027

19982028
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2025,6 +2055,7 @@ mod tests {
20252055
Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
20262056
_ => panic!("Unexpected event: {:?}", event),
20272057
}
2058+
Ok(())
20282059
}
20292060
};
20302061

lightning-invoice/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,7 @@ mod test {
13911391
} else {
13921392
other_events.borrow_mut().push(event);
13931393
}
1394+
Ok(())
13941395
};
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13961397
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ where C::Target: chain::Filter,
516516
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
517517
use crate::events::EventsProvider;
518518
let events = core::cell::RefCell::new(Vec::new());
519-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
519+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
520520
self.process_pending_events(&event_handler);
521521
events.into_inner()
522522
}
@@ -527,7 +527,7 @@ where C::Target: chain::Filter,
527527
/// See the trait-level documentation of [`EventsProvider`] for requirements.
528528
///
529529
/// [`EventsProvider`]: crate::events::EventsProvider
530-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
530+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(),()>>, H: Fn(Event) -> Future>(
531531
&self, handler: H
532532
) {
533533
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a

lightning/src/chain/channelmonitor.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,19 +1170,36 @@ macro_rules! _process_events_body {
11701170
pending_events = inner.pending_events.clone();
11711171
repeated_events = inner.get_repeated_events();
11721172
} else { break; }
1173-
let num_events = pending_events.len();
11741173

1175-
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1174+
let mut num_handled_events = 0;
1175+
let mut handling_failed = false;
1176+
for event in pending_events.into_iter() {
11761177
$event_to_handle = event;
1177-
$handle_event;
1178+
match $handle_event {
1179+
Ok(()) => num_handled_events += 1,
1180+
Err(()) => {
1181+
// If we encounter an error we stop handling events and make sure to replay
1182+
// any unhandled events on the next invocation.
1183+
handling_failed = true;
1184+
break;
1185+
}
1186+
}
1187+
}
1188+
1189+
for event in repeated_events.into_iter() {
1190+
// For repeated events we ignore any errors as they will be replayed eventually
1191+
// anyways.
1192+
$event_to_handle = event;
1193+
$handle_event.ok();
11781194
}
11791195

11801196
if let Some(us) = $self_opt {
11811197
let mut inner = us.inner.lock().unwrap();
1182-
inner.pending_events.drain(..num_events);
1198+
inner.pending_events.drain(..num_handled_events);
11831199
inner.is_processing_pending_events = false;
1184-
if !inner.pending_events.is_empty() {
1185-
// If there's more events to process, go ahead and do so.
1200+
if !handling_failed && !inner.pending_events.is_empty() {
1201+
// If there's more events to process and we didn't fail so far, go ahead and do
1202+
// so.
11861203
continue;
11871204
}
11881205
}
@@ -1508,7 +1525,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
15081525
/// Processes any events asynchronously.
15091526
///
15101527
/// See [`Self::process_pending_events`] for more information.
1511-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1528+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> Future>(
15121529
&self, handler: &H
15131530
) {
15141531
let mut ev;

lightning/src/events/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2203,8 +2203,10 @@ pub trait MessageSendEventsProvider {
22032203
///
22042204
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
22052205
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
2206-
/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
2207-
/// relevant changes to disk *before* returning.
2206+
/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
2207+
/// persist any relevant changes to disk *before* returning `Ok(())`. In case of a (e.g.,
2208+
/// persistence failure) implementors should return `Err(())`, signalling to the [`EventsProvider`]
2209+
/// to replay unhandled events on the next invocation.
22082210
///
22092211
/// Further, because an application may crash between an [`Event`] being handled and the
22102212
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
@@ -2235,22 +2237,22 @@ pub trait EventsProvider {
22352237
///
22362238
/// An async variation also exists for implementations of [`EventsProvider`] that support async
22372239
/// event handling. The async event handler should satisfy the generic bounds: `F:
2238-
/// core::future::Future, H: Fn(Event) -> F`.
2240+
/// core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> F`.
22392241
pub trait EventHandler {
22402242
/// Handles the given [`Event`].
22412243
///
22422244
/// See [`EventsProvider`] for details that must be considered when implementing this method.
2243-
fn handle_event(&self, event: Event);
2245+
fn handle_event(&self, event: Event) -> Result<(), ()>;
22442246
}
22452247

2246-
impl<F> EventHandler for F where F: Fn(Event) {
2247-
fn handle_event(&self, event: Event) {
2248+
impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ()> {
2249+
fn handle_event(&self, event: Event) -> Result<(), ()> {
22482250
self(event)
22492251
}
22502252
}
22512253

22522254
impl<T: EventHandler> EventHandler for Arc<T> {
2253-
fn handle_event(&self, event: Event) {
2255+
fn handle_event(&self, event: Event) -> Result<(), ()> {
22542256
self.deref().handle_event(event)
22552257
}
22562258
}

0 commit comments

Comments
 (0)