Skip to content

Commit 018908f

Browse files
committed
Make event handling fallible
Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation).
1 parent b5b57f1 commit 018908f

File tree

9 files changed

+346
-213
lines changed

9 files changed

+346
-213
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
2626
use lightning::events::EventHandler;
2727
#[cfg(feature = "std")]
2828
use lightning::events::EventsProvider;
29+
#[cfg(feature = "futures")]
30+
use lightning::events::ReplayEvent;
2931
use lightning::events::{Event, PathFailure};
3032

3133
use lightning::ln::channelmanager::AChannelManager;
@@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
583585
/// could setup `process_events_async` like this:
584586
/// ```
585587
/// # use lightning::io;
588+
/// # use lightning::events::ReplayEvent;
586589
/// # use std::sync::{Arc, RwLock};
587590
/// # use std::sync::atomic::{AtomicBool, Ordering};
588591
/// # use std::time::SystemTime;
@@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
600603
/// # }
601604
/// # struct EventHandler {}
602605
/// # impl EventHandler {
603-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
606+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
604607
/// # }
605608
/// # #[derive(Eq, PartialEq, Clone, Hash)]
606609
/// # struct SocketDescriptor {}
@@ -698,7 +701,7 @@ pub async fn process_events_async<
698701
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
699702
L: 'static + Deref + Send + Sync,
700703
P: 'static + Deref + Send + Sync,
701-
EventHandlerFuture: core::future::Future<Output = ()>,
704+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
702705
EventHandler: Fn(Event) -> EventHandlerFuture,
703706
PS: 'static + Deref + Send,
704707
M: 'static
@@ -751,12 +754,16 @@ where
751754
if update_scorer(scorer, &event, duration_since_epoch) {
752755
log_trace!(logger, "Persisting scorer after update");
753756
if let Err(e) = persister.persist_scorer(&scorer) {
754-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
757+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
758+
// We opt not to abort early on persistence failure here as persisting
759+
// the scorer is non-critical and we still hope that it will have
760+
// resolved itself when it is potentially critical in event handling
761+
// below.
755762
}
756763
}
757764
}
758765
}
759-
event_handler(event).await;
766+
event_handler(event).await
760767
})
761768
};
762769
define_run_body!(
@@ -913,7 +920,7 @@ impl BackgroundProcessor {
913920
}
914921
}
915922
}
916-
event_handler.handle_event(event);
923+
event_handler.handle_event(event)
917924
};
918925
define_run_body!(
919926
persister,
@@ -1757,7 +1764,7 @@ mod tests {
17571764
// Initiate the background processors to watch each node.
17581765
let data_dir = nodes[0].kv_store.get_data_dir();
17591766
let persister = Arc::new(Persister::new(data_dir));
1760-
let event_handler = |_: _| {};
1767+
let event_handler = |_: _| Ok(());
17611768
let bg_processor = BackgroundProcessor::start(
17621769
persister,
17631770
event_handler,
@@ -1847,7 +1854,7 @@ mod tests {
18471854
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
18481855
let data_dir = nodes[0].kv_store.get_data_dir();
18491856
let persister = Arc::new(Persister::new(data_dir));
1850-
let event_handler = |_: _| {};
1857+
let event_handler = |_: _| Ok(());
18511858
let bg_processor = BackgroundProcessor::start(
18521859
persister,
18531860
event_handler,
@@ -1889,7 +1896,7 @@ mod tests {
18891896
let persister = Arc::new(
18901897
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
18911898
);
1892-
let event_handler = |_: _| {};
1899+
let event_handler = |_: _| Ok(());
18931900
let bg_processor = BackgroundProcessor::start(
18941901
persister,
18951902
event_handler,
@@ -1924,7 +1931,7 @@ mod tests {
19241931

19251932
let bp_future = super::process_events_async(
19261933
persister,
1927-
|_: _| async {},
1934+
|_: _| async { Ok(()) },
19281935
nodes[0].chain_monitor.clone(),
19291936
nodes[0].node.clone(),
19301937
Some(nodes[0].messenger.clone()),
@@ -1957,7 +1964,7 @@ mod tests {
19571964
let data_dir = nodes[0].kv_store.get_data_dir();
19581965
let persister =
19591966
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1960-
let event_handler = |_: _| {};
1967+
let event_handler = |_: _| Ok(());
19611968
let bg_processor = BackgroundProcessor::start(
19621969
persister,
19631970
event_handler,
@@ -1986,7 +1993,7 @@ mod tests {
19861993
let data_dir = nodes[0].kv_store.get_data_dir();
19871994
let persister =
19881995
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1989-
let event_handler = |_: _| {};
1996+
let event_handler = |_: _| Ok(());
19901997
let bg_processor = BackgroundProcessor::start(
19911998
persister,
19921999
event_handler,
@@ -2021,13 +2028,16 @@ mod tests {
20212028
// Set up a background event handler for FundingGenerationReady events.
20222029
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
20232030
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2024-
let event_handler = move |event: Event| match event {
2025-
Event::FundingGenerationReady { .. } => funding_generation_send
2026-
.send(handle_funding_generation_ready!(event, channel_value))
2027-
.unwrap(),
2028-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2029-
Event::ChannelReady { .. } => {},
2030-
_ => panic!("Unexpected event: {:?}", event),
2031+
let event_handler = move |event: Event| {
2032+
match event {
2033+
Event::FundingGenerationReady { .. } => funding_generation_send
2034+
.send(handle_funding_generation_ready!(event, channel_value))
2035+
.unwrap(),
2036+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2037+
Event::ChannelReady { .. } => {},
2038+
_ => panic!("Unexpected event: {:?}", event),
2039+
}
2040+
Ok(())
20312041
};
20322042

20332043
let bg_processor = BackgroundProcessor::start(
@@ -2082,11 +2092,14 @@ mod tests {
20822092

20832093
// Set up a background event handler for SpendableOutputs events.
20842094
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2085-
let event_handler = move |event: Event| match event {
2086-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2087-
Event::ChannelReady { .. } => {},
2088-
Event::ChannelClosed { .. } => {},
2089-
_ => panic!("Unexpected event: {:?}", event),
2095+
let event_handler = move |event: Event| {
2096+
match event {
2097+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2098+
Event::ChannelReady { .. } => {},
2099+
Event::ChannelClosed { .. } => {},
2100+
_ => panic!("Unexpected event: {:?}", event),
2101+
}
2102+
Ok(())
20902103
};
20912104
let persister = Arc::new(Persister::new(data_dir));
20922105
let bg_processor = BackgroundProcessor::start(
@@ -2220,7 +2233,7 @@ mod tests {
22202233
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
22212234
let data_dir = nodes[0].kv_store.get_data_dir();
22222235
let persister = Arc::new(Persister::new(data_dir));
2223-
let event_handler = |_: _| {};
2236+
let event_handler = |_: _| Ok(());
22242237
let bg_processor = BackgroundProcessor::start(
22252238
persister,
22262239
event_handler,
@@ -2315,7 +2328,7 @@ mod tests {
23152328
let data_dir = nodes[0].kv_store.get_data_dir();
23162329
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
23172330

2318-
let event_handler = |_: _| {};
2331+
let event_handler = |_: _| Ok(());
23192332
let background_processor = BackgroundProcessor::start(
23202333
persister,
23212334
event_handler,
@@ -2350,7 +2363,7 @@ mod tests {
23502363
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
23512364
let bp_future = super::process_events_async(
23522365
persister,
2353-
|_: _| async {},
2366+
|_: _| async { Ok(()) },
23542367
nodes[0].chain_monitor.clone(),
23552368
nodes[0].node.clone(),
23562369
Some(nodes[0].messenger.clone()),
@@ -2492,12 +2505,15 @@ mod tests {
24922505
#[test]
24932506
fn test_payment_path_scoring() {
24942507
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2495-
let event_handler = move |event: Event| match event {
2496-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2497-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2498-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2499-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2500-
_ => panic!("Unexpected event: {:?}", event),
2508+
let event_handler = move |event: Event| {
2509+
match event {
2510+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2511+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2512+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2513+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2514+
_ => panic!("Unexpected event: {:?}", event),
2515+
}
2516+
Ok(())
25012517
};
25022518

25032519
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2543,6 +2559,7 @@ mod tests {
25432559
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
25442560
_ => panic!("Unexpected event: {:?}", event),
25452561
}
2562+
Ok(())
25462563
}
25472564
};
25482565

lightning-invoice/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,7 @@ mod test {
13911391
} else {
13921392
other_events.borrow_mut().push(event);
13931393
}
1394+
Ok(())
13941395
};
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13961397
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
36-
use crate::events;
37-
use crate::events::{Event, EventHandler};
36+
use crate::events::{self, Event, EventHandler, ReplayEvent};
3837
use crate::util::logger::{Logger, WithContext};
3938
use crate::util::errors::APIError;
4039
use crate::util::wakers::{Future, Notifier};
@@ -533,7 +532,7 @@ where C::Target: chain::Filter,
533532
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
534533
use crate::events::EventsProvider;
535534
let events = core::cell::RefCell::new(Vec::new());
536-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
535+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
537536
self.process_pending_events(&event_handler);
538537
events.into_inner()
539538
}
@@ -544,16 +543,21 @@ where C::Target: chain::Filter,
544543
/// See the trait-level documentation of [`EventsProvider`] for requirements.
545544
///
546545
/// [`EventsProvider`]: crate::events::EventsProvider
547-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
546+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
548547
&self, handler: H
549548
) {
550549
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
551550
// crazy dance to process a monitor's events then only remove them once we've done so.
552551
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
553552
for funding_txo in mons_to_process {
554553
let mut ev;
555-
super::channelmonitor::process_events_body!(
556-
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
554+
match super::channelmonitor::process_events_body!(
555+
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) {
556+
Ok(()) => {},
557+
Err(ReplayEvent ()) => {
558+
self.event_notifier.notify();
559+
}
560+
}
557561
}
558562
}
559563

@@ -880,7 +884,12 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
880884
/// [`BumpTransaction`]: events::Event::BumpTransaction
881885
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
882886
for monitor_state in self.monitors.read().unwrap().values() {
883-
monitor_state.monitor.process_pending_events(&handler);
887+
match monitor_state.monitor.process_pending_events(&handler) {
888+
Ok(()) => {},
889+
Err(ReplayEvent ()) => {
890+
self.event_notifier.notify();
891+
}
892+
}
884893
}
885894
}
886895
}

lightning/src/chain/channelmonitor.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::chain::Filter;
5151
use crate::util::logger::{Logger, Record};
5252
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
5353
use crate::util::byte_utils;
54-
use crate::events::{ClosureReason, Event, EventHandler};
54+
use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent};
5555
use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent};
5656

5757
#[allow(unused_imports)]
@@ -1159,34 +1159,53 @@ impl<Signer: EcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signer> {
11591159
macro_rules! _process_events_body {
11601160
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
11611161
loop {
1162+
let mut handling_res = Ok(());
11621163
let (pending_events, repeated_events);
11631164
if let Some(us) = $self_opt {
11641165
let mut inner = us.inner.lock().unwrap();
11651166
if inner.is_processing_pending_events {
1166-
break;
1167+
break handling_res;
11671168
}
11681169
inner.is_processing_pending_events = true;
11691170

11701171
pending_events = inner.pending_events.clone();
11711172
repeated_events = inner.get_repeated_events();
1172-
} else { break; }
1173-
let num_events = pending_events.len();
1173+
} else { break handling_res; }
11741174

1175-
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1175+
let mut num_handled_events = 0;
1176+
for event in pending_events {
11761177
$event_to_handle = event;
1177-
$handle_event;
1178+
match $handle_event {
1179+
Ok(()) => num_handled_events += 1,
1180+
Err(e) => {
1181+
// If we encounter an error we stop handling events and make sure to replay
1182+
// any unhandled events on the next invocation.
1183+
handling_res = Err(e);
1184+
break;
1185+
}
1186+
}
1187+
}
1188+
1189+
if handling_res.is_ok() {
1190+
for event in repeated_events {
1191+
// For repeated events we ignore any errors as they will be replayed eventually
1192+
// anyways.
1193+
$event_to_handle = event;
1194+
let _ = $handle_event;
1195+
}
11781196
}
11791197

11801198
if let Some(us) = $self_opt {
11811199
let mut inner = us.inner.lock().unwrap();
1182-
inner.pending_events.drain(..num_events);
1200+
inner.pending_events.drain(..num_handled_events);
11831201
inner.is_processing_pending_events = false;
1184-
if !inner.pending_events.is_empty() {
1185-
// If there's more events to process, go ahead and do so.
1202+
if handling_res.is_ok() && !inner.pending_events.is_empty() {
1203+
// If there's more events to process and we didn't fail so far, go ahead and do
1204+
// so.
11861205
continue;
11871206
}
11881207
}
1189-
break;
1208+
break handling_res;
11901209
}
11911210
}
11921211
}
@@ -1498,21 +1517,23 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
14981517
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
14991518
/// order to handle these events.
15001519
///
1520+
/// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried.
1521+
///
15011522
/// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
15021523
/// [`BumpTransaction`]: crate::events::Event::BumpTransaction
1503-
pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
1524+
pub fn process_pending_events<H: Deref>(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler {
15041525
let mut ev;
1505-
process_events_body!(Some(self), ev, handler.handle_event(ev));
1526+
process_events_body!(Some(self), ev, handler.handle_event(ev))
15061527
}
15071528

15081529
/// Processes any events asynchronously.
15091530
///
15101531
/// See [`Self::process_pending_events`] for more information.
1511-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1532+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> Future>(
15121533
&self, handler: &H
1513-
) {
1534+
) -> Result<(), ReplayEvent> {
15141535
let mut ev;
1515-
process_events_body!(Some(self), ev, { handler(ev).await });
1536+
process_events_body!(Some(self), ev, { handler(ev).await })
15161537
}
15171538

15181539
#[cfg(test)]

0 commit comments

Comments
 (0)