Skip to content

Commit 2dd8c2b

Browse files
committed
Add Notifier to OnionMessenger
1 parent 9ce3dd5 commit 2dd8c2b

File tree

3 files changed

+94
-15
lines changed

3 files changed

+94
-15
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -492,23 +492,28 @@ pub(crate) mod futures_util {
492492
pub(crate) struct Selector<
493493
A: Future<Output = ()> + Unpin,
494494
B: Future<Output = ()> + Unpin,
495-
C: Future<Output = bool> + Unpin,
495+
C: Future<Output = ()> + Unpin,
496+
D: Future<Output = bool> + Unpin,
496497
> {
497498
pub a: A,
498499
pub b: B,
499500
pub c: C,
501+
pub d: D,
500502
}
503+
501504
pub(crate) enum SelectorOutput {
502505
A,
503506
B,
504-
C(bool),
507+
C,
508+
D(bool),
505509
}
506510

507511
impl<
508512
A: Future<Output = ()> + Unpin,
509513
B: Future<Output = ()> + Unpin,
510-
C: Future<Output = bool> + Unpin,
511-
> Future for Selector<A, B, C>
514+
C: Future<Output = ()> + Unpin,
515+
D: Future<Output = bool> + Unpin,
516+
> Future for Selector<A, B, C, D>
512517
{
513518
type Output = SelectorOutput;
514519
fn poll(
@@ -527,15 +532,43 @@ pub(crate) mod futures_util {
527532
Poll::Pending => {},
528533
}
529534
match Pin::new(&mut self.c).poll(ctx) {
535+
Poll::Ready(()) => {
536+
return Poll::Ready(SelectorOutput::C);
537+
},
538+
Poll::Pending => {},
539+
}
540+
match Pin::new(&mut self.d).poll(ctx) {
530541
Poll::Ready(res) => {
531-
return Poll::Ready(SelectorOutput::C(res));
542+
return Poll::Ready(SelectorOutput::D(res));
532543
},
533544
Poll::Pending => {},
534545
}
535546
Poll::Pending
536547
}
537548
}
538549

550+
/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
551+
/// will always be pending otherwise.
552+
pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
553+
pub optional_future: Option<F>,
554+
}
555+
556+
impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
557+
type Output = ();
558+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
559+
match self.optional_future.as_mut() {
560+
Some(f) => match Pin::new(f).poll(ctx) {
561+
Poll::Ready(()) => {
562+
self.optional_future.take();
563+
Poll::Ready(())
564+
},
565+
Poll::Pending => Poll::Pending,
566+
},
567+
None => Poll::Pending,
568+
}
569+
}
570+
}
571+
539572
// If we want to poll a future without an async context to figure out if it has completed or
540573
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
541574
// but sadly there's a good bit of boilerplate here.
@@ -557,7 +590,7 @@ pub(crate) mod futures_util {
557590
#[cfg(feature = "futures")]
558591
use core::task;
559592
#[cfg(feature = "futures")]
560-
use futures_util::{dummy_waker, Selector, SelectorOutput};
593+
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
561594

562595
/// Processes background events in a future.
563596
///
@@ -782,18 +815,25 @@ where
782815
scorer,
783816
should_break,
784817
{
818+
let om_fut = if let Some(om) = onion_messenger.as_ref() {
819+
let fut = om.get_om().get_update_future();
820+
OptionalSelector { optional_future: Some(fut) }
821+
} else {
822+
OptionalSelector { optional_future: None }
823+
};
785824
let fut = Selector {
786825
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
787826
b: chain_monitor.get_update_future(),
788-
c: sleeper(if mobile_interruptable_platform {
827+
c: om_fut,
828+
d: sleeper(if mobile_interruptable_platform {
789829
Duration::from_millis(100)
790830
} else {
791831
Duration::from_secs(FASTEST_TIMER)
792832
}),
793833
};
794834
match fut.await {
795-
SelectorOutput::A | SelectorOutput::B => {},
796-
SelectorOutput::C(exit) => {
835+
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
836+
SelectorOutput::D(exit) => {
797837
should_break = exit;
798838
},
799839
}
@@ -938,11 +978,19 @@ impl BackgroundProcessor {
938978
scorer,
939979
stop_thread.load(Ordering::Acquire),
940980
{
941-
Sleeper::from_two_futures(
942-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
943-
&chain_monitor.get_update_future(),
944-
)
945-
.wait_timeout(Duration::from_millis(100));
981+
let sleeper = if let Some(om) = onion_messenger.as_ref() {
982+
Sleeper::from_three_futures(
983+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
984+
&chain_monitor.get_update_future(),
985+
&om.get_om().get_update_future(),
986+
)
987+
} else {
988+
Sleeper::from_two_futures(
989+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
990+
&chain_monitor.get_update_future(),
991+
)
992+
};
993+
sleeper.wait_timeout(Duration::from_millis(100));
946994
},
947995
|_| Instant::now(),
948996
|time: &Instant, dur| time.elapsed().as_secs() > dur,

lightning/src/onion_message/messenger.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
3434
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
3535
use crate::util::logger::{Logger, WithContext};
3636
use crate::util::ser::Writeable;
37+
use crate::util::wakers::{Future, Notifier};
3738

3839
use core::fmt;
3940
use core::ops::Deref;
@@ -266,6 +267,9 @@ pub struct OnionMessenger<
266267
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
267268
pending_peer_connected_events: Mutex<Vec<Event>>,
268269
pending_events_processor: AtomicBool,
270+
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
271+
/// it to give to users.
272+
event_notifier: Notifier,
269273
}
270274

271275
/// [`OnionMessage`]s buffered to be sent.
@@ -1037,6 +1041,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
10371041
if $res.iter().any(|r| r.is_err()) {
10381042
// We failed handling some events. Return to have them eventually replayed.
10391043
$self.pending_events_processor.store(false, Ordering::Release);
1044+
$self.event_notifier.notify();
10401045
return;
10411046
}
10421047
}
@@ -1119,6 +1124,7 @@ where
11191124
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
11201125
pending_peer_connected_events: Mutex::new(Vec::new()),
11211126
pending_events_processor: AtomicBool::new(false),
1127+
event_notifier: Notifier::new(),
11221128
}
11231129
}
11241130

@@ -1230,6 +1236,7 @@ where
12301236
Some(addresses) => {
12311237
e.insert(OnionMessageRecipient::pending_connection(addresses))
12321238
.enqueue_message(onion_message);
1239+
self.event_notifier.notify();
12331240
Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
12341241
},
12351242
},
@@ -1345,6 +1352,18 @@ where
13451352
return
13461353
}
13471354
pending_intercepted_msgs_events.push(event);
1355+
self.event_notifier.notify();
1356+
}
1357+
1358+
/// Gets a [`Future`] that completes when an event is available via
1359+
/// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
1360+
///
1361+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
1362+
/// [`OnionMessenger`] and should instead register actions to be taken later.
1363+
///
1364+
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1365+
pub fn get_update_future(&self) -> Future {
1366+
self.event_notifier.get_future()
13481367
}
13491368

13501369
/// Processes any events asynchronously using the given handler.
@@ -1616,6 +1635,7 @@ where
16161635
pending_peer_connected_events.push(
16171636
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
16181637
);
1638+
self.event_notifier.notify();
16191639
}
16201640
} else {
16211641
self.message_recipients.lock().unwrap().remove(their_node_id);

lightning/src/util/wakers.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,21 @@ impl Sleeper {
244244
Self { notifiers: vec![Arc::clone(&future.state)] }
245245
}
246246
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
247-
// Note that this is the common case - a ChannelManager and ChainMonitor.
248247
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
249248
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
250249
}
250+
/// Constructs a new sleeper from three futures, allowing blocking on all three at once.
251+
///
252+
// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
253+
// OnionMessenger.
254+
pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
255+
let notifiers = vec![
256+
Arc::clone(&fut_a.state),
257+
Arc::clone(&fut_b.state),
258+
Arc::clone(&fut_c.state)
259+
];
260+
Self { notifiers }
261+
}
251262
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
252263
pub fn new(futures: Vec<Future>) -> Self {
253264
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }

0 commit comments

Comments
 (0)