Skip to content

Commit bf8bc3a

Browse files
committed
Replace WatchEventProvider with chain::Notify
WatchEventProvider served as a means for replacing ChainWatchInterface. However, it requires users to explicitly fetch WatchEvents, even if not interested in them. Replace WatchEventProvider by chain::Notify, which is an optional member of ChainMonitor. If set, interesting transactions and output spends are registered such that blocks containing them can be retrieved from a chain source in an efficient manner. This is useful when the chain source is not a full node. For Electrum, it allows for pre-filtered blocks. For BIP157/158, it serves as a means to match against compact filters.
1 parent a2f0154 commit bf8bc3a

File tree

9 files changed

+125
-104
lines changed

9 files changed

+125
-104
lines changed

ARCH.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ At a high level, some of the common interfaces fit together as follows:
5454
| ----------------- \ _---------------- / /
5555
| | chain::Access | \ / | ChainMonitor |---------------
5656
| ----------------- \ / ----------------
57-
| | \ /
58-
(as RoutingMessageHandler) v v
59-
\ -------------------- ---------
60-
-----------------> | NetGraphMsgHandler | | Event |
61-
-------------------- ---------
57+
| | \ / |
58+
(as RoutingMessageHandler) v v v
59+
\ -------------------- --------- -----------------
60+
-----------------> | NetGraphMsgHandler | | Event | | chain::Notify |
61+
-------------------- --------- -----------------
6262
```

fuzz/src/chanmon_consistency.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Writer for VecWriter {
8484

8585
struct TestChainMonitor {
8686
pub logger: Arc<dyn Logger>,
87-
pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
87+
pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Notify>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
8888
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
8989
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
9090
// logic will automatically force-close our channels for us (as we don't have an up-to-date
@@ -97,7 +97,7 @@ struct TestChainMonitor {
9797
impl TestChainMonitor {
9898
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
9999
Self {
100-
chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(broadcaster, logger.clone(), feeest)),
100+
chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)),
101101
logger,
102102
update_ret: Mutex::new(Ok(())),
103103
latest_monitors: Mutex::new(HashMap::new()),

fuzz/src/full_stack.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ impl<'a> std::hash::Hash for Peer<'a> {
146146

147147
type ChannelMan = ChannelManager<
148148
EnforcingChannelKeys,
149-
Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
149+
Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Notify>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
150150
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
151151
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
152152

153153
struct MoneyLossDetector<'a> {
154154
manager: Arc<ChannelMan>,
155-
monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
155+
monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Notify>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
156156
handler: PeerMan<'a>,
157157

158158
peers: &'a RefCell<[bool; 256]>,
@@ -166,7 +166,7 @@ struct MoneyLossDetector<'a> {
166166
impl<'a> MoneyLossDetector<'a> {
167167
pub fn new(peers: &'a RefCell<[bool; 256]>,
168168
manager: Arc<ChannelMan>,
169-
monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
169+
monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Notify>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
170170
handler: PeerMan<'a>) -> Self {
171171
MoneyLossDetector {
172172
manager,
@@ -334,7 +334,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
334334
};
335335

336336
let broadcast = Arc::new(TestBroadcaster{});
337-
let monitor = Arc::new(channelmonitor::ChainMonitor::new(broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
337+
let monitor = Arc::new(channelmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
338338

339339
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
340340
let mut config = UserConfig::default();

lightning-net-tokio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
3636
//! type Logger = dyn lightning::util::logger::Logger;
3737
//! type ChainAccess = dyn lightning::chain::Access;
38-
//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
38+
//! type ChainNotify = dyn lightning::chain::Notify;
39+
//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainNotify>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
3940
//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
4041
//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>;
4142
//!

lightning/src/chain/mod.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -88,32 +88,22 @@ pub trait Watch: Send + Sync {
8888
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
8989
}
9090

91-
/// An interface for providing [`WatchEvent`]s.
91+
/// The `Notify` trait defines behavior for indicating chain activity of interest pertaining to
92+
/// channels.
9293
///
93-
/// [`WatchEvent`]: enum.WatchEvent.html
94-
pub trait WatchEventProvider {
95-
/// Releases events produced since the last call. Subsequent calls must only return new events.
96-
fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
97-
}
98-
99-
/// An event indicating on-chain activity to watch for pertaining to a channel.
100-
pub enum WatchEvent {
101-
/// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
102-
/// condition.
103-
WatchTransaction {
104-
/// Identifier of the transaction.
105-
txid: Txid,
106-
107-
/// Spending condition for an output of the transaction.
108-
script_pubkey: Script,
109-
},
110-
/// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
111-
/// the spending condition.
112-
WatchOutput {
113-
/// Identifier for the output.
114-
outpoint: OutPoint,
94+
/// This is useful in order to have a [`Watch`] implementation convey to a chain source which
95+
/// transactions to be notified of. This may take the form of pre-filtering blocks or, in the case
96+
/// of [BIP 157]/[BIP 158], only fetching a block if the compact filter matches.
97+
///
98+
/// [`Watch`]: trait.Watch.html
99+
/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
100+
/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
101+
pub trait Notify: Send + Sync {
102+
/// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as
103+
/// a spending condition.
104+
fn register_tx(&self, txid: Txid, script_pubkey: Script);
115105

116-
/// Spending condition for the output.
117-
script_pubkey: Script,
118-
}
106+
/// Registers interest in spends of a transaction output identified by `outpoint` having
107+
/// `script_pubkey` as the spending condition.
108+
fn register_output(&self, outpoint: OutPoint, script_pubkey: Script);
119109
}

lightning/src/ln/channelmonitor.rs

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, Loca
4444
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
4545
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
4646
use chain;
47+
use chain::Notify;
4748
use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
4849
use chain::transaction::OutPoint;
4950
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
@@ -181,26 +182,49 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source });
181182
/// independently to monitor channels remotely.
182183
///
183184
/// [`chain::Watch`]: ../../chain/trait.Watch.html
184-
/// [`ChannelManager`]: ../channelmanager/struct.ChannelManager.html
185-
pub struct ChainMonitor<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref>
186-
where T::Target: BroadcasterInterface,
185+
pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref>
186+
where C::Target: chain::Notify,
187+
T::Target: BroadcasterInterface,
187188
F::Target: FeeEstimator,
188189
L::Target: Logger,
189190
{
190191
/// The monitors
191192
pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
192-
watch_events: Mutex<WatchEventQueue>,
193+
watch_events: Mutex<WatchEventCache>,
194+
chain_source: Option<C>,
193195
broadcaster: T,
194196
logger: L,
195197
fee_estimator: F
196198
}
197199

198-
struct WatchEventQueue {
200+
struct WatchEventCache {
199201
watched: ChainWatchedUtil,
200-
events: Vec<chain::WatchEvent>,
202+
events: Vec<WatchEvent>,
201203
}
202204

203-
impl WatchEventQueue {
205+
/// An event indicating on-chain activity to watch for pertaining to a channel.
206+
enum WatchEvent {
207+
/// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
208+
/// condition.
209+
WatchTransaction {
210+
/// Identifier of the transaction.
211+
txid: Txid,
212+
213+
/// Spending condition for an output of the transaction.
214+
script_pubkey: Script,
215+
},
216+
/// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
217+
/// the spending condition.
218+
WatchOutput {
219+
/// Identifier for the output.
220+
outpoint: OutPoint,
221+
222+
/// Spending condition for the output.
223+
script_pubkey: Script,
224+
}
225+
}
226+
227+
impl WatchEventCache {
204228
fn new() -> Self {
205229
Self {
206230
watched: ChainWatchedUtil::new(),
@@ -210,7 +234,7 @@ impl WatchEventQueue {
210234

211235
fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
212236
if self.watched.register_tx(txid, script_pubkey) {
213-
self.events.push(chain::WatchEvent::WatchTransaction {
237+
self.events.push(WatchEvent::WatchTransaction {
214238
txid: *txid,
215239
script_pubkey: script_pubkey.clone()
216240
});
@@ -220,7 +244,7 @@ impl WatchEventQueue {
220244
fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
221245
let (txid, index) = outpoint;
222246
if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
223-
self.events.push(chain::WatchEvent::WatchOutput {
247+
self.events.push(WatchEvent::WatchOutput {
224248
outpoint: OutPoint {
225249
txid: *txid,
226250
index: index as u16,
@@ -230,15 +254,30 @@ impl WatchEventQueue {
230254
}
231255
}
232256

233-
fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
234-
let mut pending_events = Vec::with_capacity(self.events.len());
235-
pending_events.append(&mut self.events);
236-
pending_events
257+
fn flush_events<C: Deref>(&mut self, chain_source: &Option<C>) -> bool where C::Target: chain::Notify {
258+
let num_events = self.events.len();
259+
match chain_source {
260+
&None => self.events.clear(),
261+
&Some(ref chain_source) => {
262+
for event in self.events.drain(..) {
263+
match event {
264+
WatchEvent::WatchTransaction { txid, script_pubkey } => {
265+
chain_source.register_tx(txid, script_pubkey)
266+
},
267+
WatchEvent::WatchOutput { outpoint, script_pubkey } => {
268+
chain_source.register_output(outpoint, script_pubkey)
269+
},
270+
}
271+
}
272+
}
273+
}
274+
num_events > 0
237275
}
238276
}
239277

240-
impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<Key, ChanSigner, T, F, L>
241-
where T::Target: BroadcasterInterface,
278+
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
279+
where C::Target: chain::Notify,
280+
T::Target: BroadcasterInterface,
242281
F::Target: FeeEstimator,
243282
L::Target: Logger,
244283
{
@@ -247,9 +286,13 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<Key, Ch
247286
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
248287
/// be retuned by [`chain::Watch::release_pending_htlc_updates`].
249288
///
289+
/// Calls back to [`chain::Notify`] if any monitor indicated new outputs to watch, returning
290+
/// `true` if so.
291+
///
250292
/// [`ChannelMonitor::block_connected`]: struct.ChannelMonitor.html#method.block_connected
251293
/// [`chain::Watch::release_pending_htlc_updates`]: ../../chain/trait.Watch.html#tymethod.release_pending_htlc_updates
252-
pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
294+
/// [`chain::Notify`]: ../../chain/trait.Notify.html
295+
pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) -> bool {
253296
let mut watch_events = self.watch_events.lock().unwrap();
254297
let matched_txn: Vec<_> = txdata.iter().filter(|&&(_, tx)| watch_events.watched.does_match_tx(tx)).map(|e| *e).collect();
255298
{
@@ -264,6 +307,7 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<Key, Ch
264307
}
265308
}
266309
}
310+
watch_events.flush_events(&self.chain_source)
267311
}
268312

269313
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
@@ -279,24 +323,30 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<Key, Ch
279323
}
280324
}
281325

282-
impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, T, F, L>
283-
where T::Target: BroadcasterInterface,
326+
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
327+
where C::Target: chain::Notify,
328+
T::Target: BroadcasterInterface,
284329
F::Target: FeeEstimator,
285330
L::Target: Logger,
286331
{
287332
/// Creates a new object which can be used to monitor several channels given the chain
288333
/// interface with which to register to receive notifications.
289-
pub fn new(broadcaster: T, logger: L, feeest: F) -> Self {
334+
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
290335
Self {
291336
monitors: Mutex::new(HashMap::new()),
292-
watch_events: Mutex::new(WatchEventQueue::new()),
337+
watch_events: Mutex::new(WatchEventCache::new()),
338+
chain_source,
293339
broadcaster,
294340
logger,
295341
fee_estimator: feeest,
296342
}
297343
}
298344

299345
/// Adds or updates the monitor which monitors the channel referred to by the given outpoint.
346+
///
347+
/// Calls back to [`chain::Notify`] with the funding transaction and outputs to watch.
348+
///
349+
/// [`chain::Notify`]: ../../chain/trait.Notify.html
300350
pub fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
301351
let mut watch_events = self.watch_events.lock().unwrap();
302352
let mut monitors = self.monitors.lock().unwrap();
@@ -316,6 +366,7 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
316366
}
317367
}
318368
entry.insert(monitor);
369+
watch_events.flush_events(&self.chain_source);
319370
Ok(())
320371
}
321372

@@ -332,8 +383,9 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
332383
}
333384
}
334385

335-
impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, T, F, L>
336-
where T::Target: BroadcasterInterface,
386+
impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, C, T, F, L>
387+
where C::Target: chain::Notify,
388+
T::Target: BroadcasterInterface,
337389
F::Target: FeeEstimator,
338390
L::Target: Logger,
339391
{
@@ -362,8 +414,9 @@ impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L:
362414
}
363415
}
364416

365-
impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, T, F, L>
366-
where T::Target: BroadcasterInterface,
417+
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, C, T, F, L>
418+
where C::Target: chain::Notify,
419+
T::Target: BroadcasterInterface,
367420
F::Target: FeeEstimator,
368421
L::Target: Logger,
369422
{
@@ -376,16 +429,6 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> events::EventsProvid
376429
}
377430
}
378431

379-
impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> chain::WatchEventProvider for ChainMonitor<ChanSigner, T, F, L>
380-
where T::Target: BroadcasterInterface,
381-
F::Target: FeeEstimator,
382-
L::Target: Logger,
383-
{
384-
fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
385-
self.watch_events.lock().unwrap().dequeue_events()
386-
}
387-
}
388-
389432
/// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
390433
/// instead claiming it in its own individual transaction.
391434
pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;

0 commit comments

Comments
 (0)