10
10
11
11
use lightning:: chain;
12
12
use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
13
+ use lightning:: chain:: chainmonitor:: ChainMonitor ;
14
+ use lightning:: chain:: channelmonitor;
13
15
use lightning:: chain:: keysinterface:: { Sign , KeysInterface } ;
14
16
use lightning:: ln:: channelmanager:: ChannelManager ;
15
17
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
@@ -102,26 +104,31 @@ impl BackgroundProcessor {
102
104
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
103
105
pub fn start <
104
106
Signer : ' static + Sign ,
105
- M : ' static + Deref + Send + Sync ,
107
+ CF : ' static + Deref + Send + Sync ,
108
+ CW : ' static + Deref + Send + Sync ,
106
109
T : ' static + Deref + Send + Sync ,
107
110
K : ' static + Deref + Send + Sync ,
108
111
F : ' static + Deref + Send + Sync ,
109
112
L : ' static + Deref + Send + Sync ,
113
+ P : ' static + Deref + Send + Sync ,
110
114
Descriptor : ' static + SocketDescriptor + Send + Sync ,
111
115
CMH : ' static + Deref + Send + Sync ,
112
116
RMH : ' static + Deref + Send + Sync ,
113
117
EH : ' static + EventHandler + Send + Sync ,
114
- CMP : ' static + Send + ChannelManagerPersister < Signer , M , T , K , F , L > ,
115
- CM : ' static + Deref < Target = ChannelManager < Signer , M , T , K , F , L > > + Send + Sync ,
118
+ CMP : ' static + Send + ChannelManagerPersister < Signer , CW , T , K , F , L > ,
119
+ M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
120
+ CM : ' static + Deref < Target = ChannelManager < Signer , CW , T , K , F , L > > + Send + Sync ,
116
121
PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L > > + Send + Sync ,
117
122
>
118
- ( persister : CMP , event_handler : EH , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
123
+ ( persister : CMP , event_handler : EH , chain_monitor : M , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
119
124
where
120
- M :: Target : ' static + chain:: Watch < Signer > ,
125
+ CF :: Target : ' static + chain:: Filter ,
126
+ CW :: Target : ' static + chain:: Watch < Signer > ,
121
127
T :: Target : ' static + BroadcasterInterface ,
122
128
K :: Target : ' static + KeysInterface < Signer = Signer > ,
123
129
F :: Target : ' static + FeeEstimator ,
124
130
L :: Target : ' static + Logger ,
131
+ P :: Target : ' static + channelmonitor:: Persist < Signer > ,
125
132
CMH :: Target : ' static + ChannelMessageHandler ,
126
133
RMH :: Target : ' static + RoutingMessageHandler ,
127
134
{
@@ -132,6 +139,7 @@ impl BackgroundProcessor {
132
139
loop {
133
140
peer_manager. process_events ( ) ;
134
141
channel_manager. process_pending_events ( & event_handler) ;
142
+ chain_monitor. process_pending_events ( & event_handler) ;
135
143
let updates_available =
136
144
channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
137
145
if updates_available {
@@ -162,10 +170,13 @@ impl BackgroundProcessor {
162
170
163
171
#[ cfg( test) ]
164
172
mod tests {
173
+ use bitcoin:: blockdata:: block:: BlockHeader ;
165
174
use bitcoin:: blockdata:: constants:: genesis_block;
166
175
use bitcoin:: blockdata:: transaction:: { Transaction , TxOut } ;
167
176
use bitcoin:: network:: constants:: Network ;
177
+ use lightning:: chain:: Confirm ;
168
178
use lightning:: chain:: chainmonitor;
179
+ use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
169
180
use lightning:: chain:: keysinterface:: { InMemorySigner , KeysInterface , KeysManager } ;
170
181
use lightning:: chain:: transaction:: OutPoint ;
171
182
use lightning:: get_event_msg;
@@ -184,6 +195,8 @@ mod tests {
184
195
use std:: time:: Duration ;
185
196
use super :: { BackgroundProcessor , FRESHNESS_TIMER } ;
186
197
198
+ const EVENT_DEADLINE : u64 = 5 * FRESHNESS_TIMER ;
199
+
187
200
#[ derive( Clone , Eq , Hash , PartialEq ) ]
188
201
struct TestDescriptor { }
189
202
impl SocketDescriptor for TestDescriptor {
@@ -199,8 +212,11 @@ mod tests {
199
212
struct Node {
200
213
node : Arc < SimpleArcChannelManager < ChainMonitor , test_utils:: TestBroadcaster , test_utils:: TestFeeEstimator , test_utils:: TestLogger > > ,
201
214
peer_manager : Arc < PeerManager < TestDescriptor , Arc < test_utils:: TestChannelMessageHandler > , Arc < test_utils:: TestRoutingMessageHandler > , Arc < test_utils:: TestLogger > > > ,
215
+ chain_monitor : Arc < ChainMonitor > ,
202
216
persister : Arc < FilesystemPersister > ,
217
+ tx_broadcaster : Arc < test_utils:: TestBroadcaster > ,
203
218
logger : Arc < test_utils:: TestLogger > ,
219
+ best_block : BestBlock ,
204
220
}
205
221
206
222
impl Drop for Node {
@@ -232,14 +248,12 @@ mod tests {
232
248
let now = Duration :: from_secs ( genesis_block ( network) . header . time as u64 ) ;
233
249
let keys_manager = Arc :: new ( KeysManager :: new ( & seed, now. as_secs ( ) , now. subsec_nanos ( ) ) ) ;
234
250
let chain_monitor = Arc :: new ( chainmonitor:: ChainMonitor :: new ( Some ( chain_source. clone ( ) ) , tx_broadcaster. clone ( ) , logger. clone ( ) , fee_estimator. clone ( ) , persister. clone ( ) ) ) ;
235
- let params = ChainParameters {
236
- network,
237
- best_block : BestBlock :: from_genesis ( network) ,
238
- } ;
239
- let manager = Arc :: new ( ChannelManager :: new ( fee_estimator. clone ( ) , chain_monitor. clone ( ) , tx_broadcaster, logger. clone ( ) , keys_manager. clone ( ) , UserConfig :: default ( ) , params) ) ;
251
+ let best_block = BestBlock :: from_genesis ( network) ;
252
+ let params = ChainParameters { network, best_block } ;
253
+ let manager = Arc :: new ( ChannelManager :: new ( fee_estimator. clone ( ) , chain_monitor. clone ( ) , tx_broadcaster. clone ( ) , logger. clone ( ) , keys_manager. clone ( ) , UserConfig :: default ( ) , params) ) ;
240
254
let msg_handler = MessageHandler { chan_handler : Arc :: new ( test_utils:: TestChannelMessageHandler :: new ( ) ) , route_handler : Arc :: new ( test_utils:: TestRoutingMessageHandler :: new ( ) ) } ;
241
255
let peer_manager = Arc :: new ( PeerManager :: new ( msg_handler, keys_manager. get_node_secret ( ) , & seed, logger. clone ( ) ) ) ;
242
- let node = Node { node : manager, peer_manager, persister, logger } ;
256
+ let node = Node { node : manager, peer_manager, chain_monitor , persister, tx_broadcaster , logger, best_block } ;
243
257
nodes. push ( node) ;
244
258
}
245
259
nodes
@@ -289,6 +303,27 @@ mod tests {
289
303
} }
290
304
}
291
305
306
+ fn confirm_transaction ( node : & mut Node , tx : & Transaction ) {
307
+ for i in 1 ..=ANTI_REORG_DELAY {
308
+ let prev_blockhash = node. best_block . block_hash ( ) ;
309
+ let height = node. best_block . height ( ) + 1 ;
310
+ let header = BlockHeader { version : 0x20000000 , prev_blockhash, merkle_root : Default :: default ( ) , time : height, bits : 42 , nonce : 42 } ;
311
+ let txdata = vec ! [ ( 0 , tx) ] ;
312
+ node. best_block = BestBlock :: new ( header. block_hash ( ) , height) ;
313
+ match i {
314
+ 1 => {
315
+ node. node . transactions_confirmed ( & header, & txdata, height) ;
316
+ node. chain_monitor . transactions_confirmed ( & header, & txdata, height) ;
317
+ } ,
318
+ ANTI_REORG_DELAY => {
319
+ node. node . best_block_updated ( & header, height) ;
320
+ node. chain_monitor . best_block_updated ( & header, height) ;
321
+ } ,
322
+ _ => { } ,
323
+ }
324
+ }
325
+ }
326
+
292
327
#[ test]
293
328
fn test_background_processor ( ) {
294
329
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -305,7 +340,7 @@ mod tests {
305
340
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
306
341
let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
307
342
let event_handler = |_| { } ;
308
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
343
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes [ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
309
344
310
345
macro_rules! check_persisted_data {
311
346
( $node: expr, $filepath: expr, $expected_bytes: expr) => {
@@ -358,7 +393,7 @@ mod tests {
358
393
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
359
394
let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
360
395
let event_handler = |_| { } ;
361
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
396
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes [ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
362
397
loop {
363
398
let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
364
399
let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred" . to_string ( ) ;
@@ -378,13 +413,13 @@ mod tests {
378
413
379
414
let persister = |_: & _ | Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) ) ;
380
415
let event_handler = |_| { } ;
381
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
416
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes [ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
382
417
let _ = bg_processor. thread_handle . join ( ) . unwrap ( ) . expect_err ( "Errored persisting manager: test" ) ;
383
418
}
384
419
385
420
#[ test]
386
421
fn test_background_event_handling ( ) {
387
- let nodes = create_nodes ( 2 , "test_background_event_handling" . to_string ( ) ) ;
422
+ let mut nodes = create_nodes ( 2 , "test_background_event_handling" . to_string ( ) ) ;
388
423
let channel_value = 100000 ;
389
424
let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
390
425
let persister = move |node : & _ | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
@@ -394,15 +429,39 @@ mod tests {
394
429
let event_handler = move |event| {
395
430
sender. send ( handle_funding_generation_ready ! ( event, channel_value) ) . unwrap ( ) ;
396
431
} ;
397
- let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
432
+ let bg_processor = BackgroundProcessor :: start ( persister. clone ( ) , event_handler, nodes [ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
398
433
399
434
// Open a channel and check that the FundingGenerationReady event was handled.
400
435
begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
401
- let timeout = Duration :: from_secs ( 5 * FRESHNESS_TIMER ) ;
402
- let ( temporary_channel_id, tx) = receiver
403
- . recv_timeout ( timeout)
436
+ let ( temporary_channel_id, funding_tx) = receiver
437
+ . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
404
438
. expect ( "FundingGenerationReady not handled within deadline" ) ;
405
- end_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , temporary_channel_id, tx) ;
439
+ end_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , temporary_channel_id, funding_tx) ;
440
+
441
+ // Confirm the funding transaction.
442
+ confirm_transaction ( & mut nodes[ 0 ] , & funding_tx) ;
443
+ confirm_transaction ( & mut nodes[ 1 ] , & funding_tx) ;
444
+ nodes[ 0 ] . node . handle_funding_locked ( & nodes[ 1 ] . node . get_our_node_id ( ) , & get_event_msg ! ( nodes[ 1 ] , MessageSendEvent :: SendFundingLocked , nodes[ 0 ] . node. get_our_node_id( ) ) ) ;
445
+ nodes[ 1 ] . node . handle_funding_locked ( & nodes[ 0 ] . node . get_our_node_id ( ) , & get_event_msg ! ( nodes[ 0 ] , MessageSendEvent :: SendFundingLocked , nodes[ 1 ] . node. get_our_node_id( ) ) ) ;
446
+
447
+ assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
448
+
449
+ // Set up a background event handler for SpendableOutputs events.
450
+ let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
451
+ let event_handler = move |event| sender. send ( event) . unwrap ( ) ;
452
+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
453
+
454
+ // Force close the channel and check that the SpendableOutputs event was handled.
455
+ nodes[ 0 ] . node . force_close_channel ( & nodes[ 0 ] . node . list_channels ( ) [ 0 ] . channel_id ) . unwrap ( ) ;
456
+ let commitment_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
457
+ confirm_transaction ( & mut nodes[ 0 ] , & commitment_tx) ;
458
+ let event = receiver
459
+ . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
460
+ . expect ( "SpendableOutputs not handled within deadline" ) ;
461
+ match event {
462
+ Event :: SpendableOutputs { .. } => { } ,
463
+ _ => panic ! ( "Unexpected event: {:?}" , event) ,
464
+ }
406
465
407
466
assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
408
467
}
0 commit comments