Skip to content

Commit a1e57da

Browse files
committed
Refactor EventsProvider to take an EventHandler
1 parent c7e198e commit a1e57da

File tree

12 files changed

+136
-49
lines changed

12 files changed

+136
-49
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use lightning::util::errors::APIError;
4646
use lightning::util::events;
4747
use lightning::util::logger::Logger;
4848
use lightning::util::config::UserConfig;
49-
use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
49+
use lightning::util::events::MessageSendEventsProvider;
5050
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
5151
use lightning::util::test_utils::OnlyReadsKeysInterface;
5252
use lightning::routing::router::{Route, RouteHop};

fuzz/src/full_stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError;
3939
use lightning::routing::router::get_route;
4040
use lightning::routing::network_graph::NetGraphMsgHandler;
4141
use lightning::util::config::UserConfig;
42-
use lightning::util::events::{EventsProvider,Event};
42+
use lightning::util::events::Event;
4343
use lightning::util::enforcing_trait_impls::EnforcingSigner;
4444
use lightning::util::logger::Logger;
4545
use lightning::util::ser::Readable;

lightning-background-processor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ mod tests {
173173
use lightning::ln::msgs::ChannelMessageHandler;
174174
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
175175
use lightning::util::config::UserConfig;
176-
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
176+
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
177177
use lightning::util::logger::Logger;
178178
use lightning::util::ser::Writeable;
179179
use lightning::util::test_utils;

lightning-net-tokio/src/lib.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
//! use tokio::sync::mpsc;
2727
//! use std::net::TcpStream;
2828
//! use bitcoin::secp256k1::key::PublicKey;
29-
//! use lightning::util::events::EventsProvider;
29+
//! use lightning::util::events::{Event, EventHandler, EventsProvider};
3030
//! use std::net::SocketAddr;
3131
//! use std::sync::Arc;
3232
//!
@@ -47,12 +47,12 @@
4747
//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
4848
//! loop {
4949
//! receiver.recv().await;
50-
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
51-
//! // Handle the event!
52-
//! }
53-
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
54-
//! // Handle the event!
55-
//! }
50+
//! channel_manager.process_pending_events(&|event| {
51+
//! // Handle the event!
52+
//! });
53+
//! chain_monitor.process_pending_events(&|event| {
54+
//! // Handle the event!
55+
//! });
5656
//! }
5757
//! }
5858
//!
@@ -62,12 +62,12 @@
6262
//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
6363
//! loop {
6464
//! receiver.recv().await;
65-
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
66-
//! // Handle the event!
67-
//! }
68-
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
69-
//! // Handle the event!
70-
//! }
65+
//! channel_manager.process_pending_events(&|event| {
66+
//! // Handle the event!
67+
//! });
68+
//! chain_monitor.process_pending_events(&|event| {
69+
//! // Handle the event!
70+
//! });
7171
//! }
7272
//! }
7373
//! ```

lightning/src/chain/chainmonitor.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData};
3535
use chain::keysinterface::Sign;
3636
use util::logger::Logger;
3737
use util::events;
38-
use util::events::Event;
38+
use util::events::EventHandler;
3939

4040
use std::collections::{HashMap, hash_map};
4141
use std::sync::RwLock;
@@ -139,6 +139,15 @@ where C::Target: chain::Filter,
139139
persister,
140140
}
141141
}
142+
143+
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
144+
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
145+
use util::events::EventsProvider;
146+
let events = std::cell::RefCell::new(Vec::new());
147+
let event_handler = |event| events.borrow_mut().push(event);
148+
self.process_pending_events(&event_handler);
149+
events.into_inner()
150+
}
142151
}
143152

144153
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -306,12 +315,17 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
306315
L::Target: Logger,
307316
P::Target: channelmonitor::Persist<ChannelSigner>,
308317
{
309-
fn get_and_clear_pending_events(&self) -> Vec<Event> {
318+
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
319+
///
320+
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
321+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
310322
let mut pending_events = Vec::new();
311323
for monitor in self.monitors.read().unwrap().values() {
312324
pending_events.append(&mut monitor.get_and_clear_pending_events());
313325
}
314-
pending_events
326+
for event in pending_events.drain(..) {
327+
handler.handle_event(event);
328+
}
315329
}
316330
}
317331

@@ -320,7 +334,6 @@ mod tests {
320334
use ::{check_added_monitors, get_local_commitment_txn};
321335
use ln::features::InitFeatures;
322336
use ln::functional_test_utils::*;
323-
use util::events::EventsProvider;
324337
use util::events::MessageSendEventsProvider;
325338
use util::test_utils::{OnRegisterOutput, TxOutReference};
326339

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use ln::msgs;
2626
use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
2727
use routing::router::get_route;
2828
use util::enforcing_trait_impls::EnforcingSigner;
29-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
29+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
3030
use util::errors::APIError;
3131
use util::ser::{ReadableArgs, Writeable};
3232

lightning/src/ln/channelmanager.rs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use ln::onion_utils;
5454
use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
5555
use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
5656
use util::config::UserConfig;
57-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
57+
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
5858
use util::{byte_utils, events};
5959
use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
6060
use util::chacha20::{ChaCha20, ChaChaReader};
@@ -1718,6 +1718,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
17181718
/// Note that this includes RBF or similar transaction replacement strategies - lightning does
17191719
/// not currently support replacing a funding transaction on an existing channel. Instead,
17201720
/// create a new channel with a conflicting funding transaction.
1721+
///
1722+
/// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady
17211723
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> {
17221724
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
17231725

@@ -3410,11 +3412,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
34103412
}
34113413
}
34123414

3413-
/// Process pending events from the `chain::Watch`.
3414-
fn process_pending_monitor_events(&self) {
3415+
/// Process pending events from the `chain::Watch`, returning whether any events were processed.
3416+
fn process_pending_monitor_events(&self) -> bool {
34153417
let mut failed_channels = Vec::new();
3416-
{
3417-
for monitor_event in self.chain_monitor.release_pending_monitor_events() {
3418+
let has_pending_monitor_events = {
3419+
let pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
3420+
let has_pending_monitor_events = !pending_monitor_events.is_empty();
3421+
for monitor_event in pending_monitor_events {
34183422
match monitor_event {
34193423
MonitorEvent::HTLCEvent(htlc_update) => {
34203424
if let Some(preimage) = htlc_update.payment_preimage {
@@ -3451,11 +3455,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
34513455
},
34523456
}
34533457
}
3454-
}
3458+
has_pending_monitor_events
3459+
};
34553460

34563461
for failure in failed_channels.drain(..) {
34573462
self.finish_force_close_channel(failure);
34583463
}
3464+
3465+
has_pending_monitor_events
34593466
}
34603467

34613468
/// Handle a list of channel failures during a block_connected or block_disconnected call,
@@ -3580,6 +3587,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
35803587
pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result<PaymentSecret, APIError> {
35813588
self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id)
35823589
}
3590+
3591+
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
3592+
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
3593+
let events = std::cell::RefCell::new(Vec::new());
3594+
let event_handler = |event| events.borrow_mut().push(event);
3595+
self.process_pending_events(&event_handler);
3596+
events.into_inner()
3597+
}
35833598
}
35843599

35853600
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -3602,21 +3617,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
36023617
}
36033618

36043619
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> EventsProvider for ChannelManager<Signer, M, T, K, F, L>
3605-
where M::Target: chain::Watch<Signer>,
3606-
T::Target: BroadcasterInterface,
3607-
K::Target: KeysInterface<Signer = Signer>,
3608-
F::Target: FeeEstimator,
3609-
L::Target: Logger,
3620+
where
3621+
M::Target: chain::Watch<Signer>,
3622+
T::Target: BroadcasterInterface,
3623+
K::Target: KeysInterface<Signer = Signer>,
3624+
F::Target: FeeEstimator,
3625+
L::Target: Logger,
36103626
{
3611-
fn get_and_clear_pending_events(&self) -> Vec<Event> {
3612-
//TODO: This behavior should be documented. It's non-intuitive that we query
3613-
// ChannelMonitors when clearing other events.
3614-
self.process_pending_monitor_events();
3627+
/// Processes events that must be periodically handled.
3628+
///
3629+
/// An [`EventHandler`] may safely call back to the provider in order to handle an event.
3630+
/// However, it must not call [`Writeable::write`] as doing so would result in a deadlock.
3631+
///
3632+
/// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared
3633+
/// when processed, an [`EventHandler`] must be able to handle previously seen events when
3634+
/// restarting from an old state.
3635+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
3636+
PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || {
3637+
let mut result = NotifyOption::SkipPersist;
36153638

3616-
let mut ret = Vec::new();
3617-
let mut pending_events = self.pending_events.lock().unwrap();
3618-
mem::swap(&mut ret, &mut *pending_events);
3619-
ret
3639+
// TODO: This behavior should be documented. It's unintuitive that we query
3640+
// ChannelMonitors when clearing other events.
3641+
if self.process_pending_monitor_events() {
3642+
result = NotifyOption::DoPersist;
3643+
}
3644+
3645+
let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
3646+
if !pending_events.is_empty() {
3647+
result = NotifyOption::DoPersist;
3648+
}
3649+
3650+
for event in pending_events.drain(..) {
3651+
handler.handle_event(event);
3652+
}
3653+
3654+
result
3655+
});
36203656
}
36213657
}
36223658

@@ -4878,7 +4914,7 @@ pub mod bench {
48784914
use routing::router::get_route;
48794915
use util::test_utils;
48804916
use util::config::UserConfig;
4881-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
4917+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
48824918

48834919
use bitcoin::hashes::Hash;
48844920
use bitcoin::hashes::sha256::Hash as Sha256;

lightning/src/ln/functional_test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler};
2323
use util::enforcing_trait_impls::EnforcingSigner;
2424
use util::test_utils;
2525
use util::test_utils::TestChainMonitor;
26-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
26+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
2727
use util::errors::APIError;
2828
use util::config::UserConfig;
2929
use util::ser::{ReadableArgs, Writeable, Readable};

lightning/src/ln/functional_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use ln::msgs;
2929
use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction};
3030
use util::enforcing_trait_impls::EnforcingSigner;
3131
use util::{byte_utils, test_utils};
32-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
32+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
3333
use util::errors::APIError;
3434
use util::ser::{Writeable, ReadableArgs};
3535
use util::config::UserConfig;

lightning/src/ln/onion_route_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use ln::features::{InitFeatures, InvoiceFeatures};
2020
use ln::msgs;
2121
use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField};
2222
use util::test_utils;
23-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
23+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
2424
use util::ser::{Writeable, Writer};
2525
use util::config::UserConfig;
2626

lightning/src/ln/reorg_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
1515
use ln::features::InitFeatures;
1616
use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate};
1717
use util::enforcing_trait_impls::EnforcingSigner;
18-
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
18+
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
1919
use util::test_utils;
2020
use util::ser::{ReadableArgs, Writeable};
2121

lightning/src/util/events.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use bitcoin::blockdata::script::Script;
2424
use bitcoin::secp256k1::key::PublicKey;
2525

2626
use core::time::Duration;
27+
use std::ops::Deref;
2728

2829
/// An Event which you should probably take some action in response to.
2930
///
@@ -376,9 +377,46 @@ pub trait MessageSendEventsProvider {
376377
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
377378
}
378379

379-
/// A trait indicating an object may generate events
380+
/// A trait indicating an object may generate events.
381+
///
382+
/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
383+
///
384+
/// # Requirements
385+
///
386+
/// See [`process_pending_events`] for requirements around event processing.
387+
///
388+
/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending
389+
/// event since the last invocation. The handler must either act upon the event immediately
390+
/// or preserve it for later handling.
391+
///
392+
/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to
393+
/// consult the provider's documentation on the implication of processing events and how a handler
394+
/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and
395+
/// [`ChainMonitor::process_pending_events`]).
396+
///
397+
/// [`process_pending_events`]: Self::process_pending_events
398+
/// [`handle_event`]: EventHandler::handle_event
399+
/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events
400+
/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events
380401
pub trait EventsProvider {
381-
/// Gets the list of pending events which were generated by previous actions, clearing the list
382-
/// in the process.
383-
fn get_and_clear_pending_events(&self) -> Vec<Event>;
402+
/// Processes any events generated since the last call using the given event handler.
403+
///
404+
/// Subsequent calls must only process new events. However, handlers must be capable of handling
405+
/// duplicate events across process restarts. This may occur if the provider was recovered from
406+
/// an old state (i.e., it hadn't been successfully persisted after processing pending events).
407+
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
408+
}
409+
410+
/// A trait implemented for objects handling events from [`EventsProvider`].
411+
pub trait EventHandler {
412+
/// Handles the given [`Event`].
413+
///
414+
/// See [`EventsProvider`] for details that must be considered when implementing this method.
415+
fn handle_event(&self, event: Event);
416+
}
417+
418+
impl<F> EventHandler for F where F: Fn(Event) {
419+
fn handle_event(&self, event: Event) {
420+
self(event)
421+
}
384422
}

0 commit comments

Comments
 (0)