Skip to content

Commit b3d45cc

Browse files
committed
Add Notifier to OnionMessenger
1 parent 9ce3dd5 commit b3d45cc

File tree

3 files changed

+109
-19
lines changed

3 files changed

+109
-19
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -492,23 +492,28 @@ pub(crate) mod futures_util {
492492
pub(crate) struct Selector<
493493
A: Future<Output = ()> + Unpin,
494494
B: Future<Output = ()> + Unpin,
495-
C: Future<Output = bool> + Unpin,
495+
C: Future<Output = ()> + Unpin,
496+
D: Future<Output = bool> + Unpin,
496497
> {
497498
pub a: A,
498499
pub b: B,
499500
pub c: C,
501+
pub d: D,
500502
}
503+
501504
pub(crate) enum SelectorOutput {
502505
A,
503506
B,
504-
C(bool),
507+
C,
508+
D(bool),
505509
}
506510

507511
impl<
508512
A: Future<Output = ()> + Unpin,
509513
B: Future<Output = ()> + Unpin,
510-
C: Future<Output = bool> + Unpin,
511-
> Future for Selector<A, B, C>
514+
C: Future<Output = ()> + Unpin,
515+
D: Future<Output = bool> + Unpin,
516+
> Future for Selector<A, B, C, D>
512517
{
513518
type Output = SelectorOutput;
514519
fn poll(
@@ -527,15 +532,43 @@ pub(crate) mod futures_util {
527532
Poll::Pending => {},
528533
}
529534
match Pin::new(&mut self.c).poll(ctx) {
535+
Poll::Ready(()) => {
536+
return Poll::Ready(SelectorOutput::C);
537+
},
538+
Poll::Pending => {},
539+
}
540+
match Pin::new(&mut self.d).poll(ctx) {
530541
Poll::Ready(res) => {
531-
return Poll::Ready(SelectorOutput::C(res));
542+
return Poll::Ready(SelectorOutput::D(res));
532543
},
533544
Poll::Pending => {},
534545
}
535546
Poll::Pending
536547
}
537548
}
538549

550+
/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
551+
/// will always be pending otherwise.
552+
pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
553+
pub optional_future: Option<F>,
554+
}
555+
556+
impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
557+
type Output = ();
558+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
559+
match self.optional_future.as_mut() {
560+
Some(f) => match Pin::new(f).poll(ctx) {
561+
Poll::Ready(()) => {
562+
self.optional_future.take();
563+
Poll::Ready(())
564+
},
565+
Poll::Pending => Poll::Pending,
566+
},
567+
None => Poll::Pending,
568+
}
569+
}
570+
}
571+
539572
// If we want to poll a future without an async context to figure out if it has completed or
540573
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
541574
// but sadly there's a good bit of boilerplate here.
@@ -557,7 +590,7 @@ pub(crate) mod futures_util {
557590
#[cfg(feature = "futures")]
558591
use core::task;
559592
#[cfg(feature = "futures")]
560-
use futures_util::{dummy_waker, Selector, SelectorOutput};
593+
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
561594

562595
/// Processes background events in a future.
563596
///
@@ -782,18 +815,25 @@ where
782815
scorer,
783816
should_break,
784817
{
818+
let om_fut = if let Some(om) = onion_messenger.as_ref() {
819+
let fut = om.get_om().get_update_future();
820+
OptionalSelector { optional_future: Some(fut) }
821+
} else {
822+
OptionalSelector { optional_future: None }
823+
};
785824
let fut = Selector {
786825
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
787826
b: chain_monitor.get_update_future(),
788-
c: sleeper(if mobile_interruptable_platform {
827+
c: om_fut,
828+
d: sleeper(if mobile_interruptable_platform {
789829
Duration::from_millis(100)
790830
} else {
791831
Duration::from_secs(FASTEST_TIMER)
792832
}),
793833
};
794834
match fut.await {
795-
SelectorOutput::A | SelectorOutput::B => {},
796-
SelectorOutput::C(exit) => {
835+
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
836+
SelectorOutput::D(exit) => {
797837
should_break = exit;
798838
},
799839
}
@@ -938,11 +978,19 @@ impl BackgroundProcessor {
938978
scorer,
939979
stop_thread.load(Ordering::Acquire),
940980
{
941-
Sleeper::from_two_futures(
942-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
943-
&chain_monitor.get_update_future(),
944-
)
945-
.wait_timeout(Duration::from_millis(100));
981+
let sleeper = if let Some(om) = onion_messenger.as_ref() {
982+
Sleeper::from_three_futures(
983+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
984+
&chain_monitor.get_update_future(),
985+
&om.get_om().get_update_future(),
986+
)
987+
} else {
988+
Sleeper::from_two_futures(
989+
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
990+
&chain_monitor.get_update_future(),
991+
)
992+
};
993+
sleeper.wait_timeout(Duration::from_millis(100));
946994
},
947995
|_| Instant::now(),
948996
|time: &Instant, dur| time.elapsed().as_secs() > dur,

lightning/src/onion_message/messenger.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
3434
use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
3535
use crate::util::logger::{Logger, WithContext};
3636
use crate::util::ser::Writeable;
37+
use crate::util::wakers::{Future, Notifier};
3738

3839
use core::fmt;
3940
use core::ops::Deref;
@@ -266,6 +267,9 @@ pub struct OnionMessenger<
266267
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
267268
pending_peer_connected_events: Mutex<Vec<Event>>,
268269
pending_events_processor: AtomicBool,
270+
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
271+
/// it to give to users.
272+
event_notifier: Notifier,
269273
}
270274

271275
/// [`OnionMessage`]s buffered to be sent.
@@ -290,13 +294,19 @@ impl OnionMessageRecipient {
290294
}
291295
}
292296

293-
fn enqueue_message(&mut self, message: OnionMessage) {
297+
// Returns whether changes were made that are pending event processing
298+
fn enqueue_message(&mut self, message: OnionMessage) -> bool {
299+
let mut pending_event_processing = false;
294300
let pending_messages = match self {
295301
OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
296-
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
302+
OnionMessageRecipient::PendingConnection(pending_messages, _, _) => {
303+
pending_event_processing = true;
304+
pending_messages
305+
}
297306
};
298307

299308
pending_messages.push_back(message);
309+
pending_event_processing
300310
}
301311

302312
fn dequeue_message(&mut self) -> Option<OnionMessage> {
@@ -1037,6 +1047,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
10371047
if $res.iter().any(|r| r.is_err()) {
10381048
// We failed handling some events. Return to have them eventually replayed.
10391049
$self.pending_events_processor.store(false, Ordering::Release);
1050+
$self.event_notifier.notify();
10401051
return;
10411052
}
10421053
}
@@ -1119,6 +1130,7 @@ where
11191130
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
11201131
pending_peer_connected_events: Mutex::new(Vec::new()),
11211132
pending_events_processor: AtomicBool::new(false),
1133+
event_notifier: Notifier::new(),
11221134
}
11231135
}
11241136

@@ -1228,13 +1240,19 @@ where
12281240
hash_map::Entry::Vacant(e) => match addresses {
12291241
None => Err(SendError::InvalidFirstHop(first_node_id)),
12301242
Some(addresses) => {
1231-
e.insert(OnionMessageRecipient::pending_connection(addresses))
1243+
let notify = e.insert(OnionMessageRecipient::pending_connection(addresses))
12321244
.enqueue_message(onion_message);
1245+
if notify {
1246+
self.event_notifier.notify();
1247+
}
12331248
Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
12341249
},
12351250
},
12361251
hash_map::Entry::Occupied(mut e) => {
1237-
e.get_mut().enqueue_message(onion_message);
1252+
let notify = e.get_mut().enqueue_message(onion_message);
1253+
if notify {
1254+
self.event_notifier.notify();
1255+
}
12381256
if e.get().is_connected() {
12391257
Ok(SendSuccess::Buffered)
12401258
} else {
@@ -1345,6 +1363,18 @@ where
13451363
return
13461364
}
13471365
pending_intercepted_msgs_events.push(event);
1366+
self.event_notifier.notify();
1367+
}
1368+
1369+
/// Gets a [`Future`] that completes when an event is available via
1370+
/// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
1371+
///
1372+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
1373+
/// [`OnionMessenger`] and should instead register actions to be taken later.
1374+
///
1375+
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1376+
pub fn get_update_future(&self) -> Future {
1377+
self.event_notifier.get_future()
13481378
}
13491379

13501380
/// Processes any events asynchronously using the given handler.
@@ -1616,6 +1646,7 @@ where
16161646
pending_peer_connected_events.push(
16171647
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
16181648
);
1649+
self.event_notifier.notify();
16191650
}
16201651
} else {
16211652
self.message_recipients.lock().unwrap().remove(their_node_id);

lightning/src/util/wakers.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,21 @@ impl Sleeper {
244244
Self { notifiers: vec![Arc::clone(&future.state)] }
245245
}
246246
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
247-
// Note that this is the common case - a ChannelManager and ChainMonitor.
248247
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
249248
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
250249
}
250+
/// Constructs a new sleeper from three futures, allowing blocking on all three at once.
251+
///
252+
// Note that this is the common case - a ChannelManager, a ChainMonitor, and an
253+
// OnionMessenger.
254+
pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
255+
let notifiers = vec![
256+
Arc::clone(&fut_a.state),
257+
Arc::clone(&fut_b.state),
258+
Arc::clone(&fut_c.state)
259+
];
260+
Self { notifiers }
261+
}
251262
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
252263
pub fn new(futures: Vec<Future>) -> Self {
253264
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }

0 commit comments

Comments
 (0)