Skip to content

Commit d71c174

Browse files
committed
Rename MultiFuturePoller and let it return concrete results
1 parent 9a1ea25 commit d71c174

File tree

3 files changed

+58
-22
lines changed

3 files changed

+58
-22
lines changed

lightning/src/events/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,6 +2334,7 @@ pub trait EventsProvider {
23342334
/// currently succeed (e.g., due to a persistence failure).
23352335
///
23362336
/// LDK will ensure the event is persisted and will eventually be replayed.
2337+
#[derive(Clone, Copy, Debug)]
23372338
pub struct ReplayEvent();
23382339

23392340
/// A trait implemented for objects handling events from [`EventsProvider`].

lightning/src/onion_message/messenger.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
1818
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
1919
use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs};
2020
use crate::blinded_path::utils;
21-
use crate::events::{Event, EventHandler, EventsProvider};
21+
use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
2222
use crate::sign::{EntropySource, NodeSigner, Recipient};
2323
use crate::ln::features::{InitFeatures, NodeFeatures};
2424
use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
@@ -1328,7 +1328,7 @@ where
13281328
/// have an ordering requirement.
13291329
///
13301330
/// See the trait-level documentation of [`EventsProvider`] for requirements.
1331-
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>> + core::marker::Unpin, H: Fn(Event) -> Future>(
1331+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
13321332
&self, handler: H
13331333
) {
13341334
let mut intercepted_msgs = Vec::new();
@@ -1346,26 +1346,26 @@ where
13461346
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
13471347
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
13481348
if let Some(addresses) = addresses.take() {
1349-
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
1349+
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
13501350
}
13511351
}
13521352
}
13531353

13541354
for ev in intercepted_msgs {
13551355
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1356-
futures.push(Some(handler(ev)));
1356+
futures.push(handler(ev));
13571357
}
13581358
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1359-
crate::util::async_poll::MultiFuturePoller(futures).await;
1359+
crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
13601360

13611361
if peer_connecteds.len() <= 1 {
13621362
for event in peer_connecteds { handler(event).await; }
13631363
} else {
13641364
let mut futures = Vec::new();
13651365
for event in peer_connecteds {
1366-
futures.push(Some(handler(event)));
1366+
futures.push(handler(event));
13671367
}
1368-
crate::util::async_poll::MultiFuturePoller(futures).await;
1368+
crate::util::async_poll::MultiResultFuturePoller::new(futures).await;
13691369
}
13701370
}
13711371
}

lightning/src/util/async_poll.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,64 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll};
1717

18-
pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
18+
enum ResultFuture<F: Future<Output = Result<(), E>>, E: Copy + Unpin> {
19+
Pending(F),
20+
Ready(Result<(), E>),
21+
}
22+
23+
pub(crate) struct MultiResultFuturePoller<
24+
F: Future<Output = Result<(), E>> + Unpin,
25+
E: Copy + Unpin,
26+
> {
27+
futures_state: Vec<ResultFuture<F, E>>,
28+
}
1929

20-
impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
21-
type Output = ();
22-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
30+
impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> MultiResultFuturePoller<F, E> {
31+
pub fn new(futures: Vec<F>) -> Self {
32+
let futures_state = futures.into_iter().map(|f| ResultFuture::Pending(f)).collect();
33+
Self { futures_state }
34+
}
35+
}
36+
37+
impl<F: Future<Output = Result<(), E>> + Unpin, E: Copy + Unpin> Future
38+
for MultiResultFuturePoller<F, E>
39+
{
40+
type Output = Vec<Result<(), E>>;
41+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
2342
let mut have_pending_futures = false;
24-
for fut_option in self.get_mut().0.iter_mut() {
25-
let mut fut = match fut_option.take() {
26-
None => continue,
27-
Some(fut) => fut,
28-
};
29-
match Pin::new(&mut fut).poll(cx) {
30-
Poll::Ready(()) => {},
31-
Poll::Pending => {
32-
have_pending_futures = true;
33-
*fut_option = Some(fut);
43+
let futures_state = &mut self.get_mut().futures_state;
44+
for state in futures_state.iter_mut() {
45+
match state {
46+
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
47+
Poll::Ready(res) => {
48+
*state = ResultFuture::Ready(res);
49+
},
50+
Poll::Pending => {
51+
have_pending_futures = true;
52+
},
3453
},
54+
ResultFuture::Ready(_) => continue,
3555
}
3656
}
57+
3758
if have_pending_futures {
3859
Poll::Pending
3960
} else {
40-
Poll::Ready(())
61+
let results = futures_state
62+
.iter()
63+
.filter_map(|e| match e {
64+
ResultFuture::Ready(res) => Some(res),
65+
ResultFuture::Pending(_) => {
66+
debug_assert!(
67+
false,
68+
"All futures are expected to be ready if none are pending"
69+
);
70+
None
71+
},
72+
})
73+
.copied()
74+
.collect();
75+
Poll::Ready(results)
4176
}
4277
}
4378
}

0 commit comments

Comments
 (0)