Skip to content

Commit 6ca8770

Browse files
committed
Handle fallible events in OnionMessenger
Previously, we would just fire-and-forget in `OnionMessenger`'s event handling. Since we now introduced the possibility of event handling failures, we here adapt the event handling logic to retain any events which we failed to handle to have them replayed upon the next invocation of `process_pending_events`/`process_pending_events_async`.
1 parent cca051a commit 6ca8770

File tree

1 file changed

+84
-34
lines changed

1 file changed

+84
-34
lines changed

lightning/src/onion_message/messenger.rs

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::util::ser::Writeable;
3636

3737
use core::fmt;
3838
use core::ops::Deref;
39+
use core::sync::atomic::{AtomicBool, Ordering};
3940
use crate::io;
4041
use crate::sync::Mutex;
4142
use crate::prelude::*;
@@ -263,6 +264,7 @@ pub struct OnionMessenger<
263264
intercept_messages_for_offline_peers: bool,
264265
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
265266
pending_peer_connected_events: Mutex<Vec<Event>>,
267+
pending_events_processor: AtomicBool,
266268
}
267269

268270
/// [`OnionMessage`]s buffered to be sent.
@@ -1017,6 +1019,28 @@ where
10171019
}
10181020
}
10191021

1022+
macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
1023+
// We want ot make sure to cleanly abort upon event handling failure. To this end, we drop all
1024+
// successfully handled events from the given queue, reset the events processing flag, and
1025+
// return, to have the events eventually replayed upon next invocation.
1026+
{
1027+
let mut queue_lock = $event_queue.lock().unwrap();
1028+
1029+
// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1030+
let mut res_iter = $res.iter().skip($offset);
1031+
1032+
// Keep all events which previously error'd *or* any that have been added since we dropped
1033+
// the Mutex before.
1034+
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
1035+
1036+
if $res.iter().any(|r| r.is_err()) {
1037+
// We failed handling some events. Return to have them eventually replayed.
1038+
$self.pending_events_processor.store(false, Ordering::Release);
1039+
return;
1040+
}
1041+
}
1042+
}}
1043+
10201044
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
10211045
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
10221046
where
@@ -1093,6 +1117,7 @@ where
10931117
intercept_messages_for_offline_peers,
10941118
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
10951119
pending_peer_connected_events: Mutex::new(Vec::new()),
1120+
pending_events_processor: AtomicBool::new(false),
10961121
}
10971122
}
10981123

@@ -1331,42 +1356,57 @@ where
13311356
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
13321357
&self, handler: H
13331358
) {
1334-
let mut intercepted_msgs = Vec::new();
1335-
let mut peer_connecteds = Vec::new();
1336-
{
1337-
let mut pending_intercepted_msgs_events =
1338-
self.pending_intercepted_msgs_events.lock().unwrap();
1339-
let mut pending_peer_connected_events =
1340-
self.pending_peer_connected_events.lock().unwrap();
1341-
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
1342-
core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
1359+
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1360+
return;
13431361
}
13441362

1345-
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1346-
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1347-
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1348-
if let Some(addresses) = addresses.take() {
1349-
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
1363+
{
1364+
let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
1365+
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1366+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1367+
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1368+
if let Some(addresses) = addresses.take() {
1369+
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
1370+
}
13501371
}
13511372
}
1352-
}
13531373

1354-
for ev in intercepted_msgs {
1355-
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1356-
futures.push(handler(ev));
1374+
// The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
1375+
// replaying `ConnectionNeeded` events.
1376+
let intercepted_msgs_offset = futures.len();
1377+
1378+
for ev in intercepted_msgs {
1379+
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1380+
futures.push(handler(ev));
1381+
}
1382+
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1383+
let res = crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
1384+
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
13571385
}
1358-
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1359-
crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
13601386

1361-
if peer_connecteds.len() <= 1 {
1362-
for event in peer_connecteds { handler(event).await; }
1363-
} else {
1364-
let mut futures = Vec::new();
1365-
for event in peer_connecteds {
1366-
futures.push(handler(event));
1387+
{
1388+
let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
1389+
let num_peer_connecteds = peer_connecteds.len();
1390+
if num_peer_connecteds <= 1 {
1391+
for event in peer_connecteds {
1392+
if handler(event).await.is_ok() {
1393+
self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
1394+
} else {
1395+
// We failed handling the event. Return to have it eventually replayed.
1396+
self.pending_events_processor.store(false, Ordering::Release);
1397+
return;
1398+
}
1399+
}
1400+
} else {
1401+
let mut futures = Vec::new();
1402+
for event in peer_connecteds {
1403+
futures.push(handler(event));
1404+
}
1405+
let res = crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
1406+
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
13671407
}
1368-
crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
13691408
}
1409+
self.pending_events_processor.store(false, Ordering::Release);
13701410
}
13711411
}
13721412

@@ -1406,17 +1446,24 @@ where
14061446
CMH::Target: CustomOnionMessageHandler,
14071447
{
14081448
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
1449+
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1450+
return;
1451+
}
1452+
14091453
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
14101454
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
14111455
if let Some(addresses) = addresses.take() {
14121456
let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
14131457
}
14141458
}
14151459
}
1416-
let mut events = Vec::new();
1460+
let intercepted_msgs;
1461+
let peer_connecteds;
14171462
{
1418-
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1463+
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1464+
intercepted_msgs = pending_intercepted_msgs_events.clone();
14191465
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1466+
peer_connecteds = pending_peer_connected_events.clone();
14201467
#[cfg(debug_assertions)] {
14211468
for ev in pending_intercepted_msgs_events.iter() {
14221469
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
@@ -1425,13 +1472,16 @@ where
14251472
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
14261473
}
14271474
}
1428-
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
1429-
events.append(&mut pending_peer_connected_events);
14301475
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14311476
}
1432-
for ev in events {
1433-
handler.handle_event(ev);
1434-
}
1477+
1478+
let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1479+
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
1480+
1481+
let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1482+
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
1483+
1484+
self.pending_events_processor.store(false, Ordering::Release);
14351485
}
14361486
}
14371487

0 commit comments

Comments
 (0)