Skip to content

Commit 5a45ebe

Browse files
committed
Make event handling fallible
Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation).
1 parent bc4a5ea commit 5a45ebe

File tree

8 files changed

+250
-180
lines changed

8 files changed

+250
-180
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ use core::task;
552552
/// # }
553553
/// # struct EventHandler {}
554554
/// # impl EventHandler {
555-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
555+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ()> { Ok(()) }
556556
/// # }
557557
/// # #[derive(Eq, PartialEq, Clone, Hash)]
558558
/// # struct SocketDescriptor {}
@@ -646,7 +646,7 @@ pub async fn process_events_async<
646646
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
647647
L: 'static + Deref + Send + Sync,
648648
P: 'static + Deref + Send + Sync,
649-
EventHandlerFuture: core::future::Future<Output = ()>,
649+
EventHandlerFuture: core::future::Future<Output = Result<(),()>>,
650650
EventHandler: Fn(Event) -> EventHandlerFuture,
651651
PS: 'static + Deref + Send,
652652
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
@@ -692,12 +692,13 @@ where
692692
if update_scorer(scorer, &event, duration_since_epoch) {
693693
log_trace!(logger, "Persisting scorer after update");
694694
if let Err(e) = persister.persist_scorer(&scorer) {
695-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
695+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
696+
return Err(());
696697
}
697698
}
698699
}
699700
}
700-
event_handler(event).await;
701+
event_handler(event).await
701702
}
702703
};
703704
define_run_body!(
@@ -731,7 +732,7 @@ where
731732

732733
#[cfg(feature = "futures")]
733734
async fn process_onion_message_handler_events_async<
734-
EventHandlerFuture: core::future::Future<Output = ()>,
735+
EventHandlerFuture: core::future::Future<Output = Result<(), ()>>,
735736
EventHandler: Fn(Event) -> EventHandlerFuture,
736737
PM: 'static + Deref + Send + Sync,
737738
>(
@@ -741,10 +742,11 @@ where
741742
PM::Target: APeerManager + Send + Sync,
742743
{
743744
let events = core::cell::RefCell::new(Vec::new());
744-
peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
745+
peer_manager.onion_message_handler().process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
745746

746747
for event in events.into_inner() {
747-
handler(event).await
748+
// Ignore any errors as onion messages are best effort anyways.
749+
let _ = handler(event).await;
748750
}
749751
}
750752

@@ -846,7 +848,7 @@ impl BackgroundProcessor {
846848
}
847849
}
848850
}
849-
event_handler.handle_event(event);
851+
event_handler.handle_event(event)
850852
};
851853
define_run_body!(
852854
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -1380,7 +1382,7 @@ mod tests {
13801382
// Initiate the background processors to watch each node.
13811383
let data_dir = nodes[0].kv_store.get_data_dir();
13821384
let persister = Arc::new(Persister::new(data_dir));
1383-
let event_handler = |_: _| {};
1385+
let event_handler = |_: _| { Ok(()) };
13841386
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
13851387

13861388
macro_rules! check_persisted_data {
@@ -1447,7 +1449,7 @@ mod tests {
14471449
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
14481450
let data_dir = nodes[0].kv_store.get_data_dir();
14491451
let persister = Arc::new(Persister::new(data_dir));
1450-
let event_handler = |_: _| {};
1452+
let event_handler = |_: _| { Ok(()) };
14511453
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14521454
loop {
14531455
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -1476,7 +1478,7 @@ mod tests {
14761478

14771479
let data_dir = nodes[0].kv_store.get_data_dir();
14781480
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1479-
let event_handler = |_: _| {};
1481+
let event_handler = |_: _| { Ok(()) };
14801482
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14811483
match bg_processor.join() {
14821484
Ok(_) => panic!("Expected error persisting manager"),
@@ -1498,7 +1500,7 @@ mod tests {
14981500
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
14991501

15001502
let bp_future = super::process_events_async(
1501-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1503+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
15021504
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
15031505
Some(nodes[0].scorer.clone()), move |dur: Duration| {
15041506
Box::pin(async move {
@@ -1522,7 +1524,7 @@ mod tests {
15221524
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
15231525
let data_dir = nodes[0].kv_store.get_data_dir();
15241526
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1525-
let event_handler = |_: _| {};
1527+
let event_handler = |_: _| { Ok(()) };
15261528
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15271529

15281530
match bg_processor.stop() {
@@ -1540,7 +1542,7 @@ mod tests {
15401542
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
15411543
let data_dir = nodes[0].kv_store.get_data_dir();
15421544
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1543-
let event_handler = |_: _| {};
1545+
let event_handler = |_: _| { Ok(()) };
15441546
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15451547

15461548
match bg_processor.stop() {
@@ -1562,11 +1564,14 @@ mod tests {
15621564
// Set up a background event handler for FundingGenerationReady events.
15631565
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
15641566
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1565-
let event_handler = move |event: Event| match event {
1566-
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1567-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1568-
Event::ChannelReady { .. } => {},
1569-
_ => panic!("Unexpected event: {:?}", event),
1567+
let event_handler = move |event: Event| {
1568+
match event {
1569+
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1570+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1571+
Event::ChannelReady { .. } => {},
1572+
_ => panic!("Unexpected event: {:?}", event),
1573+
}
1574+
Ok(())
15701575
};
15711576

15721577
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1599,11 +1604,14 @@ mod tests {
15991604

16001605
// Set up a background event handler for SpendableOutputs events.
16011606
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1602-
let event_handler = move |event: Event| match event {
1603-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1604-
Event::ChannelReady { .. } => {},
1605-
Event::ChannelClosed { .. } => {},
1606-
_ => panic!("Unexpected event: {:?}", event),
1607+
let event_handler = move |event: Event| {
1608+
match event {
1609+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1610+
Event::ChannelReady { .. } => {},
1611+
Event::ChannelClosed { .. } => {},
1612+
_ => panic!("Unexpected event: {:?}", event),
1613+
}
1614+
Ok(())
16071615
};
16081616
let persister = Arc::new(Persister::new(data_dir));
16091617
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1631,7 +1639,7 @@ mod tests {
16311639
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
16321640
let data_dir = nodes[0].kv_store.get_data_dir();
16331641
let persister = Arc::new(Persister::new(data_dir));
1634-
let event_handler = |_: _| {};
1642+
let event_handler = |_: _| { Ok(()) };
16351643
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
16361644

16371645
loop {
@@ -1704,7 +1712,7 @@ mod tests {
17041712
let data_dir = nodes[0].kv_store.get_data_dir();
17051713
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
17061714

1707-
let event_handler = |_: _| {};
1715+
let event_handler = |_: _| { Ok(()) };
17081716
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17091717

17101718
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
@@ -1725,7 +1733,7 @@ mod tests {
17251733

17261734
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
17271735
let bp_future = super::process_events_async(
1728-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1736+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
17291737
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
17301738
Some(nodes[0].scorer.clone()), move |dur: Duration| {
17311739
let mut exit_receiver = exit_receiver.clone();
@@ -1852,12 +1860,15 @@ mod tests {
18521860
#[test]
18531861
fn test_payment_path_scoring() {
18541862
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1855-
let event_handler = move |event: Event| match event {
1856-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1857-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1858-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1859-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1860-
_ => panic!("Unexpected event: {:?}", event),
1863+
let event_handler = move |event: Event| {
1864+
match event {
1865+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1866+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1867+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1868+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1869+
_ => panic!("Unexpected event: {:?}", event),
1870+
}
1871+
Ok(())
18611872
};
18621873

18631874
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -1890,6 +1901,7 @@ mod tests {
18901901
Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
18911902
_ => panic!("Unexpected event: {:?}", event),
18921903
}
1904+
Ok(())
18931905
}
18941906
};
18951907

lightning-invoice/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,7 @@ mod test {
13901390
} else {
13911391
other_events.borrow_mut().push(event);
13921392
}
1393+
Ok(())
13931394
};
13941395
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ where C::Target: chain::Filter,
586586
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
587587
use crate::events::EventsProvider;
588588
let events = core::cell::RefCell::new(Vec::new());
589-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
589+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
590590
self.process_pending_events(&event_handler);
591591
events.into_inner()
592592
}
@@ -597,7 +597,7 @@ where C::Target: chain::Filter,
597597
/// See the trait-level documentation of [`EventsProvider`] for requirements.
598598
///
599599
/// [`EventsProvider`]: crate::events::EventsProvider
600-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
600+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(),()>>, H: Fn(Event) -> Future>(
601601
&self, handler: H
602602
) {
603603
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a

lightning/src/chain/channelmonitor.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,19 +1165,36 @@ macro_rules! _process_events_body {
11651165
pending_events = inner.pending_events.clone();
11661166
repeated_events = inner.get_repeated_events();
11671167
} else { break; }
1168-
let num_events = pending_events.len();
11691168

1170-
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1169+
let mut num_handled_events = 0;
1170+
let mut handling_failed = false;
1171+
for event in pending_events.into_iter() {
11711172
$event_to_handle = event;
1172-
$handle_event;
1173+
match $handle_event {
1174+
Ok(()) => num_handled_events += 1,
1175+
Err(()) => {
1176+
// If we encounter an error we stop handling events and make sure to replay
1177+
// any unhandled events on the next invocation.
1178+
handling_failed = true;
1179+
break;
1180+
}
1181+
}
1182+
}
1183+
1184+
for event in repeated_events.into_iter() {
1185+
// For repeated events we ignore any errors as they will be replayed eventually
1186+
// anyways.
1187+
$event_to_handle = event;
1188+
$handle_event.ok();
11731189
}
11741190

11751191
if let Some(us) = $self_opt {
11761192
let mut inner = us.inner.lock().unwrap();
1177-
inner.pending_events.drain(..num_events);
1193+
inner.pending_events.drain(..num_handled_events);
11781194
inner.is_processing_pending_events = false;
1179-
if !inner.pending_events.is_empty() {
1180-
// If there's more events to process, go ahead and do so.
1195+
if !handling_failed && !inner.pending_events.is_empty() {
1196+
// If there's more events to process and we didn't fail so far, go ahead and do
1197+
// so.
11811198
continue;
11821199
}
11831200
}
@@ -1500,7 +1517,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
15001517
/// Processes any events asynchronously.
15011518
///
15021519
/// See [`Self::process_pending_events`] for more information.
1503-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1520+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> Future>(
15041521
&self, handler: &H
15051522
) {
15061523
let mut ev;

lightning/src/events/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,8 +2016,10 @@ pub trait MessageSendEventsProvider {
20162016
///
20172017
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
20182018
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
2019-
/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
2020-
/// relevant changes to disk *before* returning.
2019+
/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
2020+
/// persist any relevant changes to disk *before* returning `Ok(())`. In case of a (e.g.,
2021+
/// persistence failure) implementors should return `Err(())`, signalling to the [`EventsProvider`]
2022+
/// to replay unhandled events on the next invocation.
20212023
///
20222024
/// Further, because an application may crash between an [`Event`] being handled and the
20232025
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
@@ -2048,22 +2050,22 @@ pub trait EventsProvider {
20482050
///
20492051
/// An async variation also exists for implementations of [`EventsProvider`] that support async
20502052
/// event handling. The async event handler should satisfy the generic bounds: `F:
2051-
/// core::future::Future, H: Fn(Event) -> F`.
2053+
/// core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> F`.
20522054
pub trait EventHandler {
20532055
/// Handles the given [`Event`].
20542056
///
20552057
/// See [`EventsProvider`] for details that must be considered when implementing this method.
2056-
fn handle_event(&self, event: Event);
2058+
fn handle_event(&self, event: Event) -> Result<(), ()>;
20572059
}
20582060

2059-
impl<F> EventHandler for F where F: Fn(Event) {
2060-
fn handle_event(&self, event: Event) {
2061+
impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ()> {
2062+
fn handle_event(&self, event: Event) -> Result<(), ()> {
20612063
self(event)
20622064
}
20632065
}
20642066

20652067
impl<T: EventHandler> EventHandler for Arc<T> {
2066-
fn handle_event(&self, event: Event) {
2068+
fn handle_event(&self, event: Event) -> Result<(), ()> {
20672069
self.deref().handle_event(event)
20682070
}
20692071
}

0 commit comments

Comments
 (0)