Skip to content

Commit c349707

Browse files
committed
Make ChainMonitor::monitors private and expose monitor via getter
Exposing a `RwLock<HashMap<>>` directly was always a bit strange, and in upcoming changes we'd like to change the internal datastructure in `ChainMonitor`. Further, the use of `RwLock` and `HashMap` meant we weren't able to expose the ChannelMonitors themselves to users in bindings, leaving a bindings/rust API gap. Thus, we take this opportunity go expose ChannelMonitors directly via a wrapper, hiding the internals of `ChainMonitor` behind getters. We also update tests to use the new API.
1 parent 4d5cf4b commit c349707

7 files changed

+157
-112
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use util::events::EventHandler;
3838
use ln::channelmanager::ChannelDetails;
3939

4040
use prelude::*;
41-
use sync::RwLock;
41+
use sync::{RwLock, RwLockReadGuard};
4242
use core::ops::Deref;
4343

4444
/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -92,6 +92,26 @@ pub trait Persist<ChannelSigner: Sign> {
9292
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
9393
}
9494

95+
struct MonitorHolder<ChannelSigner: Sign> {
96+
monitor: ChannelMonitor<ChannelSigner>,
97+
}
98+
99+
/// A read-only reference to a current ChannelMonitor.
100+
///
101+
/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
102+
/// released.
103+
pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> {
104+
lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
105+
funding_txo: OutPoint,
106+
}
107+
108+
impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
109+
type Target = ChannelMonitor<ChannelSigner>;
110+
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
111+
&self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
112+
}
113+
}
114+
95115
/// An implementation of [`chain::Watch`] for monitoring channels.
96116
///
97117
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -108,8 +128,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
108128
L::Target: Logger,
109129
P::Target: Persist<ChannelSigner>,
110130
{
111-
/// The monitors
112-
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
131+
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
113132
chain_source: Option<C>,
114133
broadcaster: T,
115134
logger: L,
@@ -139,8 +158,8 @@ where C::Target: chain::Filter,
139158
{
140159
let mut dependent_txdata = Vec::new();
141160
let monitors = self.monitors.read().unwrap();
142-
for monitor in monitors.values() {
143-
let mut txn_outputs = process(monitor, txdata);
161+
for monitor_state in monitors.values() {
162+
let mut txn_outputs = process(&monitor_state.monitor, txdata);
144163

145164
// Register any new outputs with the chain source for filtering, storing any dependent
146165
// transactions from within the block that previously had not been included in txdata.
@@ -203,19 +222,41 @@ where C::Target: chain::Filter,
203222
pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
204223
let mut ret = Vec::new();
205224
let monitors = self.monitors.read().unwrap();
206-
for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| {
225+
for (_, monitor_state) in monitors.iter().filter(|(funding_outpoint, _)| {
207226
for chan in ignored_channels {
208227
if chan.funding_txo.as_ref() == Some(funding_outpoint) {
209228
return false;
210229
}
211230
}
212231
true
213232
}) {
214-
ret.append(&mut monitor.get_claimable_balances());
233+
ret.append(&mut monitor_state.monitor.get_claimable_balances());
215234
}
216235
ret
217236
}
218237

238+
/// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
239+
/// such [`ChannelMonitor`] is currently being monitored for.
240+
pub fn get_current_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
241+
let lock = self.monitors.read().unwrap();
242+
if lock.get(&funding_txo).is_some() {
243+
Ok(LockedChannelMonitor { lock, funding_txo })
244+
} else {
245+
Err(())
246+
}
247+
}
248+
249+
/// Gets the current list of [`ChannelMonitor`]s being monitored for, by their funding
250+
/// outpoint.
251+
pub fn list_current_monitors(&self) -> Vec<OutPoint> {
252+
self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
253+
}
254+
255+
#[cfg(test)]
256+
pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
257+
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
258+
}
259+
219260
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
220261
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
221262
use util::events::EventsProvider;
@@ -248,8 +289,8 @@ where
248289
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
249290
let monitors = self.monitors.read().unwrap();
250291
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
251-
for monitor in monitors.values() {
252-
monitor.block_disconnected(
292+
for monitor_state in monitors.values() {
293+
monitor_state.monitor.block_disconnected(
253294
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
254295
}
255296
}
@@ -275,8 +316,8 @@ where
275316
fn transaction_unconfirmed(&self, txid: &Txid) {
276317
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
277318
let monitors = self.monitors.read().unwrap();
278-
for monitor in monitors.values() {
279-
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
319+
for monitor_state in monitors.values() {
320+
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
280321
}
281322
}
282323

@@ -294,8 +335,8 @@ where
294335
fn get_relevant_txids(&self) -> Vec<Txid> {
295336
let mut txids = Vec::new();
296337
let monitors = self.monitors.read().unwrap();
297-
for monitor in monitors.values() {
298-
txids.append(&mut monitor.get_relevant_txids());
338+
for monitor_state in monitors.values() {
339+
txids.append(&mut monitor_state.monitor.get_relevant_txids());
299340
}
300341

301342
txids.sort_unstable();
@@ -338,7 +379,7 @@ where C::Target: chain::Filter,
338379
monitor.load_outputs_to_watch(chain_source);
339380
}
340381
}
341-
entry.insert(monitor);
382+
entry.insert(MonitorHolder { monitor });
342383
Ok(())
343384
}
344385

@@ -359,7 +400,8 @@ where C::Target: chain::Filter,
359400
#[cfg(not(any(test, feature = "fuzztarget")))]
360401
Err(ChannelMonitorUpdateErr::PermanentFailure)
361402
},
362-
Some(monitor) => {
403+
Some(monitor_state) => {
404+
let monitor = &monitor_state.monitor;
363405
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
364406
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
365407
if let Err(e) = &update_res {
@@ -382,8 +424,8 @@ where C::Target: chain::Filter,
382424

383425
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
384426
let mut pending_monitor_events = Vec::new();
385-
for monitor in self.monitors.read().unwrap().values() {
386-
pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
427+
for monitor_state in self.monitors.read().unwrap().values() {
428+
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
387429
}
388430
pending_monitor_events
389431
}
@@ -404,8 +446,8 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
404446
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
405447
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
406448
let mut pending_events = Vec::new();
407-
for monitor in self.monitors.read().unwrap().values() {
408-
pending_events.append(&mut monitor.get_and_clear_pending_events());
449+
for monitor_state in self.monitors.read().unwrap().values() {
450+
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
409451
}
410452
for event in pending_events.drain(..) {
411453
handler.handle_event(&event);

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,7 @@ fn test_monitor_and_persister_update_fail() {
118118
blocks: Arc::new(Mutex::new(vec![(genesis_block(Network::Testnet).header, 200); 200])),
119119
};
120120
let chain_mon = {
121-
let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap();
122-
let monitor = monitors.get(&outpoint).unwrap();
121+
let monitor = nodes[0].chain_monitor.chain_monitor.get_current_monitor(outpoint).unwrap();
123122
let mut w = test_utils::TestVecWriter(Vec::new());
124123
monitor.write(&mut w).unwrap();
125124
let new_monitor = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
@@ -2317,7 +2316,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
23172316
if reload_a {
23182317
let nodes_0_serialized = nodes[0].node.encode();
23192318
let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new());
2320-
nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap();
2319+
get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap();
23212320

23222321
persister = test_utils::TestPersister::new();
23232322
let keys_manager = &chanmon_cfgs[0].keys_manager;

lightning/src/ln/functional_test_utils.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,9 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
274274
let feeest = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) };
275275
let mut deserialized_monitors = Vec::new();
276276
{
277-
let old_monitors = self.chain_monitor.chain_monitor.monitors.read().unwrap();
278-
for (_, old_monitor) in old_monitors.iter() {
277+
for outpoint in self.chain_monitor.chain_monitor.list_current_monitors() {
279278
let mut w = test_utils::TestVecWriter(Vec::new());
280-
old_monitor.write(&mut w).unwrap();
279+
self.chain_monitor.chain_monitor.get_current_monitor(outpoint).unwrap().write(&mut w).unwrap();
281280
let (_, deserialized_monitor) = <(BlockHash, ChannelMonitor<EnforcingSigner>)>::read(
282281
&mut io::Cursor::new(&w.0), self.keys_manager).unwrap();
283282
deserialized_monitors.push(deserialized_monitor);
@@ -437,20 +436,35 @@ macro_rules! get_feerate {
437436
}
438437
}
439438

440-
/// Returns any local commitment transactions for the channel.
439+
/// Returns a channel monitor given a channel id, making some naive assumptions
441440
#[macro_export]
442-
macro_rules! get_local_commitment_txn {
441+
macro_rules! get_monitor {
443442
($node: expr, $channel_id: expr) => {
444443
{
445-
let monitors = $node.chain_monitor.chain_monitor.monitors.read().unwrap();
446-
let mut commitment_txn = None;
447-
for (funding_txo, monitor) in monitors.iter() {
448-
if funding_txo.to_channel_id() == $channel_id {
449-
commitment_txn = Some(monitor.unsafe_get_latest_holder_commitment_txn(&$node.logger));
444+
use bitcoin::hashes::Hash;
445+
let mut monitor = None;
446+
// Assume funding vout is either 0 or 1 blindly
447+
for index in 0..2 {
448+
if let Ok(mon) = $node.chain_monitor.chain_monitor.get_current_monitor(
449+
$crate::chain::transaction::OutPoint {
450+
txid: bitcoin::Txid::from_slice(&$channel_id[..]).unwrap(), index
451+
})
452+
{
453+
monitor = Some(mon);
450454
break;
451455
}
452456
}
453-
commitment_txn.unwrap()
457+
monitor.unwrap()
458+
}
459+
}
460+
}
461+
462+
/// Returns any local commitment transactions for the channel.
463+
#[macro_export]
464+
macro_rules! get_local_commitment_txn {
465+
($node: expr, $channel_id: expr) => {
466+
{
467+
$crate::get_monitor!($node, $channel_id).unsafe_get_latest_holder_commitment_txn(&$node.logger)
454468
}
455469
}
456470
}

0 commit comments

Comments
 (0)