Skip to content

Commit 640a1fd

Browse files
authored
Merge pull request #266 from tnull/2024-03-introduce-connection-manager
Improve connection management
2 parents 0bf24c8 + 77c538b commit 640a1fd

File tree

7 files changed

+196
-69
lines changed

7 files changed

+196
-69
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ bip39 = "2.0.0"
6666

6767
rand = "0.8.5"
6868
chrono = { version = "0.4", default-features = false, features = ["clock"] }
69-
futures = "0.3"
7069
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] }
7170
esplora-client = { version = "0.6", default-features = false }
7271
libc = "0.2"

src/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::config::{
22
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL,
33
WALLET_KEYS_SEED_LEN,
44
};
5+
use crate::connection::ConnectionManager;
56
use crate::event::EventQueue;
67
use crate::fee_estimator::OnchainFeeEstimator;
78
use crate::gossip::GossipSource;
@@ -895,6 +896,9 @@ fn build_with_store_internal(
895896

896897
liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));
897898

899+
let connection_manager =
900+
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
901+
898902
let output_sweeper = match io::utils::read_output_sweeper(
899903
Arc::clone(&tx_broadcaster),
900904
Arc::clone(&fee_estimator),
@@ -991,6 +995,7 @@ fn build_with_store_internal(
991995
chain_monitor,
992996
output_sweeper,
993997
peer_manager,
998+
connection_manager,
994999
keys_manager,
9951000
network_graph,
9961001
gossip_source,

src/connection.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use crate::logger::{log_error, log_info, Logger};
2+
use crate::types::PeerManager;
3+
use crate::Error;
4+
5+
use lightning::ln::msgs::SocketAddress;
6+
7+
use bitcoin::secp256k1::PublicKey;
8+
9+
use std::collections::hash_map::{self, HashMap};
10+
use std::net::ToSocketAddrs;
11+
use std::ops::Deref;
12+
use std::sync::{Arc, Mutex};
13+
use std::time::Duration;
14+
15+
pub(crate) struct ConnectionManager<L: Deref + Clone + Sync + Send>
16+
where
17+
L::Target: Logger,
18+
{
19+
pending_connections:
20+
Mutex<HashMap<PublicKey, Vec<tokio::sync::oneshot::Sender<Result<(), Error>>>>>,
21+
peer_manager: Arc<PeerManager>,
22+
logger: L,
23+
}
24+
25+
impl<L: Deref + Clone + Sync + Send> ConnectionManager<L>
26+
where
27+
L::Target: Logger,
28+
{
29+
pub(crate) fn new(peer_manager: Arc<PeerManager>, logger: L) -> Self {
30+
let pending_connections = Mutex::new(HashMap::new());
31+
Self { pending_connections, peer_manager, logger }
32+
}
33+
34+
pub(crate) async fn connect_peer_if_necessary(
35+
&self, node_id: PublicKey, addr: SocketAddress,
36+
) -> Result<(), Error> {
37+
if self.peer_manager.peer_by_node_id(&node_id).is_some() {
38+
return Ok(());
39+
}
40+
41+
self.do_connect_peer(node_id, addr).await
42+
}
43+
44+
pub(crate) async fn do_connect_peer(
45+
&self, node_id: PublicKey, addr: SocketAddress,
46+
) -> Result<(), Error> {
47+
// First, we check if there is already an outbound connection in flight, if so, we just
48+
// await on the corresponding watch channel. The task driving the connection future will
49+
// send us the result..
50+
let pending_ready_receiver_opt = self.register_or_subscribe_pending_connection(&node_id);
51+
if let Some(pending_connection_ready_receiver) = pending_ready_receiver_opt {
52+
return pending_connection_ready_receiver.await.map_err(|e| {
53+
debug_assert!(false, "Failed to receive connection result: {:?}", e);
54+
log_error!(self.logger, "Failed to receive connection result: {:?}", e);
55+
Error::ConnectionFailed
56+
})?;
57+
}
58+
59+
log_info!(self.logger, "Connecting to peer: {}@{}", node_id, addr);
60+
61+
let socket_addr = addr
62+
.to_socket_addrs()
63+
.map_err(|e| {
64+
log_error!(self.logger, "Failed to resolve network address {}: {}", addr, e);
65+
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
66+
Error::InvalidSocketAddress
67+
})?
68+
.next()
69+
.ok_or_else(|| {
70+
log_error!(self.logger, "Failed to resolve network address {}", addr);
71+
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
72+
Error::InvalidSocketAddress
73+
})?;
74+
75+
let connection_future = lightning_net_tokio::connect_outbound(
76+
Arc::clone(&self.peer_manager),
77+
node_id,
78+
socket_addr,
79+
);
80+
81+
let res = match connection_future.await {
82+
Some(connection_closed_future) => {
83+
let mut connection_closed_future = Box::pin(connection_closed_future);
84+
loop {
85+
tokio::select! {
86+
_ = &mut connection_closed_future => {
87+
log_info!(self.logger, "Peer connection closed: {}@{}", node_id, addr);
88+
break Err(Error::ConnectionFailed);
89+
},
90+
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
91+
};
92+
93+
match self.peer_manager.peer_by_node_id(&node_id) {
94+
Some(_) => break Ok(()),
95+
None => continue,
96+
}
97+
}
98+
},
99+
None => {
100+
log_error!(self.logger, "Failed to connect to peer: {}@{}", node_id, addr);
101+
Err(Error::ConnectionFailed)
102+
},
103+
};
104+
105+
self.propagate_result_to_subscribers(&node_id, res);
106+
107+
res
108+
}
109+
110+
fn register_or_subscribe_pending_connection(
111+
&self, node_id: &PublicKey,
112+
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
113+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
114+
match pending_connections_lock.entry(*node_id) {
115+
hash_map::Entry::Occupied(mut entry) => {
116+
let (tx, rx) = tokio::sync::oneshot::channel();
117+
entry.get_mut().push(tx);
118+
Some(rx)
119+
},
120+
hash_map::Entry::Vacant(entry) => {
121+
entry.insert(Vec::new());
122+
None
123+
},
124+
}
125+
}
126+
127+
fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) {
128+
// Send the result to any other tasks that might be waiting on it by now.
129+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
130+
if let Some(connection_ready_senders) = pending_connections_lock.remove(node_id) {
131+
for sender in connection_ready_senders {
132+
let _ = sender.send(res).map_err(|e| {
133+
debug_assert!(
134+
false,
135+
"Failed to send connection result to subscribers: {:?}",
136+
e
137+
);
138+
log_error!(
139+
self.logger,
140+
"Failed to send connection result to subscribers: {:?}",
141+
e
142+
);
143+
});
144+
}
145+
}
146+
}
147+
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fmt;
22

3-
#[derive(Debug, PartialEq, Eq)]
3+
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
44
/// An error that possibly needs to be handled by the user.
55
pub enum Error {
66
/// Returned when trying to start [`crate::Node`] while it is already running.

src/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ impl Future for EventFuture {
291291
}
292292
}
293293

294-
pub(crate) struct EventHandler<L: Deref>
294+
pub(crate) struct EventHandler<L: Deref + Clone + Sync + Send + 'static>
295295
where
296296
L::Target: Logger,
297297
{
@@ -307,7 +307,7 @@ where
307307
config: Arc<Config>,
308308
}
309309

310-
impl<L: Deref> EventHandler<L>
310+
impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
311311
where
312312
L::Target: Logger,
313313
{

src/lib.rs

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
mod balance;
7979
mod builder;
8080
mod config;
81+
mod connection;
8182
mod error;
8283
mod event;
8384
mod fee_estimator;
@@ -124,6 +125,7 @@ use config::{
124125
LDK_PAYMENT_RETRY_TIMEOUT, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
125126
RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
126127
};
128+
use connection::ConnectionManager;
127129
use event::{EventHandler, EventQueue};
128130
use gossip::GossipSource;
129131
use liquidity::LiquiditySource;
@@ -187,6 +189,7 @@ pub struct Node {
187189
chain_monitor: Arc<ChainMonitor>,
188190
output_sweeper: Arc<Sweeper>,
189191
peer_manager: Arc<PeerManager>,
192+
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
190193
keys_manager: Arc<KeysManager>,
191194
network_graph: Arc<NetworkGraph>,
192195
gossip_source: Arc<GossipSource>,
@@ -498,6 +501,7 @@ impl Node {
498501
}
499502

500503
// Regularly reconnect to persisted peers.
504+
let connect_cm = Arc::clone(&self.connection_manager);
501505
let connect_pm = Arc::clone(&self.peer_manager);
502506
let connect_logger = Arc::clone(&self.logger);
503507
let connect_peer_store = Arc::clone(&self.peer_store);
@@ -518,11 +522,9 @@ impl Node {
518522
.collect::<Vec<_>>();
519523

520524
for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
521-
let res = do_connect_peer(
525+
let res = connect_cm.do_connect_peer(
522526
peer_info.node_id,
523527
peer_info.address.clone(),
524-
Arc::clone(&connect_pm),
525-
Arc::clone(&connect_logger),
526528
).await;
527529
match res {
528530
Ok(_) => {
@@ -871,14 +873,13 @@ impl Node {
871873

872874
let con_node_id = peer_info.node_id;
873875
let con_addr = peer_info.address.clone();
874-
let con_logger = Arc::clone(&self.logger);
875-
let con_pm = Arc::clone(&self.peer_manager);
876+
let con_cm = Arc::clone(&self.connection_manager);
876877

877878
// We need to use our main runtime here as a local runtime might not be around to poll
878879
// connection futures going forward.
879880
tokio::task::block_in_place(move || {
880881
runtime.block_on(async move {
881-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
882+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
882883
})
883884
})?;
884885

@@ -944,14 +945,13 @@ impl Node {
944945

945946
let con_node_id = peer_info.node_id;
946947
let con_addr = peer_info.address.clone();
947-
let con_logger = Arc::clone(&self.logger);
948-
let con_pm = Arc::clone(&self.peer_manager);
948+
let con_cm = Arc::clone(&self.connection_manager);
949949

950950
// We need to use our main runtime here as a local runtime might not be around to poll
951951
// connection futures going forward.
952952
tokio::task::block_in_place(move || {
953953
runtime.block_on(async move {
954-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
954+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
955955
})
956956
})?;
957957

@@ -1601,14 +1601,13 @@ impl Node {
16011601

16021602
let con_node_id = peer_info.node_id;
16031603
let con_addr = peer_info.address.clone();
1604-
let con_logger = Arc::clone(&self.logger);
1605-
let con_pm = Arc::clone(&self.peer_manager);
1604+
let con_cm = Arc::clone(&self.connection_manager);
16061605

16071606
// We need to use our main runtime here as a local runtime might not be around to poll
16081607
// connection futures going forward.
16091608
tokio::task::block_in_place(move || {
16101609
runtime.block_on(async move {
1611-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
1610+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
16121611
})
16131612
})?;
16141613

@@ -1849,56 +1848,3 @@ pub struct NodeStatus {
18491848
/// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized.
18501849
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
18511850
}
1852-
1853-
async fn connect_peer_if_necessary(
1854-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>,
1855-
logger: Arc<FilesystemLogger>,
1856-
) -> Result<(), Error> {
1857-
if peer_manager.peer_by_node_id(&node_id).is_some() {
1858-
return Ok(());
1859-
}
1860-
1861-
do_connect_peer(node_id, addr, peer_manager, logger).await
1862-
}
1863-
1864-
async fn do_connect_peer(
1865-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>,
1866-
logger: Arc<FilesystemLogger>,
1867-
) -> Result<(), Error> {
1868-
log_info!(logger, "Connecting to peer: {}@{}", node_id, addr);
1869-
1870-
let socket_addr = addr
1871-
.to_socket_addrs()
1872-
.map_err(|e| {
1873-
log_error!(logger, "Failed to resolve network address: {}", e);
1874-
Error::InvalidSocketAddress
1875-
})?
1876-
.next()
1877-
.ok_or(Error::ConnectionFailed)?;
1878-
1879-
match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), node_id, socket_addr)
1880-
.await
1881-
{
1882-
Some(connection_closed_future) => {
1883-
let mut connection_closed_future = Box::pin(connection_closed_future);
1884-
loop {
1885-
match futures::poll!(&mut connection_closed_future) {
1886-
std::task::Poll::Ready(_) => {
1887-
log_info!(logger, "Peer connection closed: {}@{}", node_id, addr);
1888-
return Err(Error::ConnectionFailed);
1889-
},
1890-
std::task::Poll::Pending => {},
1891-
}
1892-
// Avoid blocking the tokio context by sleeping a bit
1893-
match peer_manager.peer_by_node_id(&node_id) {
1894-
Some(_) => return Ok(()),
1895-
None => tokio::time::sleep(Duration::from_millis(10)).await,
1896-
}
1897-
}
1898-
},
1899-
None => {
1900-
log_error!(logger, "Failed to connect to peer: {}@{}", node_id, addr);
1901-
Err(Error::ConnectionFailed)
1902-
},
1903-
}
1904-
}

tests/integration_tests_rust.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,3 +333,33 @@ fn do_connection_restart_behavior(persist: bool) {
333333
assert!(node_b.list_peers().is_empty());
334334
}
335335
}
336+
337+
#[test]
338+
fn concurrent_connections_succeed() {
339+
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
340+
let (node_a, node_b) = setup_two_nodes(&electrsd, false);
341+
342+
let node_a = Arc::new(node_a);
343+
let node_b = Arc::new(node_b);
344+
345+
let node_id_b = node_b.node_id();
346+
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();
347+
348+
while !node_b.status().is_listening {
349+
std::thread::sleep(std::time::Duration::from_millis(10));
350+
}
351+
352+
let mut handles = Vec::new();
353+
for _ in 0..10 {
354+
let thread_node = Arc::clone(&node_a);
355+
let thread_addr = node_addr_b.clone();
356+
let handle = std::thread::spawn(move || {
357+
thread_node.connect(node_id_b, thread_addr, false).unwrap();
358+
});
359+
handles.push(handle);
360+
}
361+
362+
for h in handles {
363+
h.join().unwrap();
364+
}
365+
}

0 commit comments

Comments
 (0)