Skip to content

Commit 134b46b

Browse files
committed
Add Notifier to OnionMessenger
1 parent 9ce3dd5 commit 134b46b

File tree

2 files changed

+82
-13
lines changed

2 files changed

+82
-13
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -492,23 +492,28 @@ pub(crate) mod futures_util {
492492
pub(crate) struct Selector<
493493
A: Future<Output = ()> + Unpin,
494494
B: Future<Output = ()> + Unpin,
495-
C: Future<Output = bool> + Unpin,
495+
C: Future<Output = ()> + Unpin,
496+
D: Future<Output = bool> + Unpin,
496497
> {
497498
pub a: A,
498499
pub b: B,
499500
pub c: C,
501+
pub d: D,
500502
}
503+
501504
pub(crate) enum SelectorOutput {
502505
A,
503506
B,
504-
C(bool),
507+
C,
508+
D(bool),
505509
}
506510

507511
impl<
508512
A: Future<Output = ()> + Unpin,
509513
B: Future<Output = ()> + Unpin,
510-
C: Future<Output = bool> + Unpin,
511-
> Future for Selector<A, B, C>
514+
C: Future<Output = ()> + Unpin,
515+
D: Future<Output = bool> + Unpin,
516+
> Future for Selector<A, B, C, D>
512517
{
513518
type Output = SelectorOutput;
514519
fn poll(
@@ -527,15 +532,41 @@ pub(crate) mod futures_util {
527532
Poll::Pending => {},
528533
}
529534
match Pin::new(&mut self.c).poll(ctx) {
535+
Poll::Ready(()) => {
536+
return Poll::Ready(SelectorOutput::C);
537+
},
538+
Poll::Pending => {},
539+
}
540+
match Pin::new(&mut self.d).poll(ctx) {
530541
Poll::Ready(res) => {
531-
return Poll::Ready(SelectorOutput::C(res));
542+
return Poll::Ready(SelectorOutput::D(res));
532543
},
533544
Poll::Pending => {},
534545
}
535546
Poll::Pending
536547
}
537548
}
538549

550+
pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
551+
pub optional_future: Option<F>,
552+
}
553+
554+
impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
555+
type Output = ();
556+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
557+
match self.optional_future.as_mut() {
558+
Some(f) => match core::pin::pin!(f).poll(ctx) {
559+
Poll::Ready(()) => {
560+
self.optional_future.take();
561+
Poll::Ready(())
562+
},
563+
Poll::Pending => Poll::Pending,
564+
},
565+
None => Poll::Pending,
566+
}
567+
}
568+
}
569+
539570
// If we want to poll a future without an async context to figure out if it has completed or
540571
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
541572
// but sadly there's a good bit of boilerplate here.
@@ -557,7 +588,7 @@ pub(crate) mod futures_util {
557588
#[cfg(feature = "futures")]
558589
use core::task;
559590
#[cfg(feature = "futures")]
560-
use futures_util::{dummy_waker, Selector, SelectorOutput};
591+
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
561592

562593
/// Processes background events in a future.
563594
///
@@ -782,18 +813,25 @@ where
782813
scorer,
783814
should_break,
784815
{
816+
let om_fut = if let Some(om) = onion_messenger.as_ref() {
817+
let fut = om.get_om().get_update_future();
818+
OptionalSelector { optional_future: Some(fut) }
819+
} else {
820+
OptionalSelector { optional_future: None }
821+
};
785822
let fut = Selector {
786823
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
787824
b: chain_monitor.get_update_future(),
788-
c: sleeper(if mobile_interruptable_platform {
825+
c: om_fut,
826+
d: sleeper(if mobile_interruptable_platform {
789827
Duration::from_millis(100)
790828
} else {
791829
Duration::from_secs(FASTEST_TIMER)
792830
}),
793831
};
794832
match fut.await {
795-
SelectorOutput::A | SelectorOutput::B => {},
796-
SelectorOutput::C(exit) => {
833+
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
834+
SelectorOutput::D(exit) => {
797835
should_break = exit;
798836
},
799837
}

lightning/src/onion_message/messenger.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
3434
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
3535
use crate::util::logger::{Logger, WithContext};
3636
use crate::util::ser::Writeable;
37+
use crate::util::wakers::{Future, Notifier};
3738

3839
use core::fmt;
3940
use core::ops::Deref;
@@ -266,6 +267,9 @@ pub struct OnionMessenger<
266267
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
267268
pending_peer_connected_events: Mutex<Vec<Event>>,
268269
pending_events_processor: AtomicBool,
270+
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
271+
/// it to give to users.
272+
event_notifier: Notifier,
269273
}
270274

271275
/// [`OnionMessage`]s buffered to be sent.
@@ -290,13 +294,19 @@ impl OnionMessageRecipient {
290294
}
291295
}
292296

293-
fn enqueue_message(&mut self, message: OnionMessage) {
297+
// Returns whether changes were made that are pending event processing
298+
fn enqueue_message(&mut self, message: OnionMessage) -> bool {
299+
let mut pending_event_processing = false;
294300
let pending_messages = match self {
295301
OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
296-
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
302+
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => {
303+
pending_event_processing = true;
304+
pending_messages
305+
}
297306
};
298307

299308
pending_messages.push_back(message);
309+
pending_event_processing
300310
}
301311

302312
fn dequeue_message(&mut self) -> Option<OnionMessage> {
@@ -1037,6 +1047,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
10371047
if $res.iter().any(|r| r.is_err()) {
10381048
// We failed handling some events. Return to have them eventually replayed.
10391049
$self.pending_events_processor.store(false, Ordering::Release);
1050+
$self.event_notifier.notify();
10401051
return;
10411052
}
10421053
}
@@ -1119,6 +1130,7 @@ where
11191130
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
11201131
pending_peer_connected_events: Mutex::new(Vec::new()),
11211132
pending_events_processor: AtomicBool::new(false),
1133+
event_notifier: Notifier::new(),
11221134
}
11231135
}
11241136

@@ -1228,13 +1240,19 @@ where
12281240
hash_map::Entry::Vacant(e) => match addresses {
12291241
None => Err(SendError::InvalidFirstHop(first_node_id)),
12301242
Some(addresses) => {
1231-
e.insert(OnionMessageRecipient::pending_connection(addresses))
1243+
let notify = e.insert(OnionMessageRecipient::pending_connection(addresses))
12321244
.enqueue_message(onion_message);
1245+
if notify {
1246+
self.event_notifier.notify();
1247+
}
12331248
Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
12341249
},
12351250
},
12361251
hash_map::Entry::Occupied(mut e) => {
1237-
e.get_mut().enqueue_message(onion_message);
1252+
let notify = e.get_mut().enqueue_message(onion_message);
1253+
if notify {
1254+
self.event_notifier.notify();
1255+
}
12381256
if e.get().is_connected() {
12391257
Ok(SendSuccess::Buffered)
12401258
} else {
@@ -1345,6 +1363,18 @@ where
13451363
return
13461364
}
13471365
pending_intercepted_msgs_events.push(event);
1366+
self.event_notifier.notify();
1367+
}
1368+
1369+
/// Gets a [`Future`] that completes when an event is available via
1370+
/// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
1371+
///
1372+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
1373+
/// [`OnionMessenger`] and should instead register actions to be taken later.
1374+
///
1375+
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1376+
pub fn get_update_future(&self) -> Future {
1377+
self.event_notifier.get_future()
13481378
}
13491379

13501380
/// Processes any events asynchronously using the given handler.
@@ -1616,6 +1646,7 @@ where
16161646
pending_peer_connected_events.push(
16171647
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
16181648
);
1649+
self.event_notifier.notify();
16191650
}
16201651
} else {
16211652
self.message_recipients.lock().unwrap().remove(their_node_id);

0 commit comments

Comments
 (0)