Skip to content

Commit 3d0ddf8

Browse files
authored
Merge pull request #161 from tox-rs/dht_refactoring
DHT refactoring
2 parents 272e8ca + 58dd55c commit 3d0ddf8

File tree

9 files changed

+322
-228
lines changed

9 files changed

+322
-228
lines changed

examples/dht_server.rs

Lines changed: 29 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use tox::toxcore::dht::packet::*;
4747
use tox::toxcore::dht::codec::*;
4848
use tox::toxcore::dht::server::*;
4949
use tox::toxcore::dht::packed_node::*;
50+
use tox::toxcore::dht::lan_discovery::*;
5051
use tox::toxcore::crypto_core::*;
5152
use tox::toxcore::io_tokio::*;
5253
use tox::toxcore::dht::dht_friend::*;
@@ -93,6 +94,9 @@ fn main() {
9394
let (lossless_tx, lossless_rx) = mpsc::unbounded();
9495
let (lossy_tx, lossy_rx) = mpsc::unbounded();
9596

97+
let local_addr: SocketAddr = "0.0.0.0:33445".parse().unwrap(); // 0.0.0.0 for ipv4
98+
// let local_addr: SocketAddr = "[::]:33445".parse().unwrap(); // [::] for ipv6
99+
96100
// Ignore DHT PublicKey updates for now
97101
let dht_pk_handler = dht_pk_rx
98102
.map_err(|_| Error::new(ErrorKind::Other, "rx error"))
@@ -118,6 +122,8 @@ fn main() {
118122
real_pk
119123
});
120124

125+
let lan_discovery_sender = LanDiscoverySender::new(tx.clone(), server_pk, local_addr.is_ipv6());
126+
121127
let mut server_obj = Server::new(tx, server_pk, server_sk);
122128
server_obj.set_net_crypto(net_crypto);
123129

@@ -164,9 +170,6 @@ fn main() {
164170
// set bootstrap info
165171
server_obj.set_bootstrap_info(07032018, "This is tox-rs".as_bytes().to_owned());
166172

167-
let local_addr: SocketAddr = "0.0.0.0:33445".parse().unwrap(); // 0.0.0.0 for ipv4
168-
// let local_addr: SocketAddr = "[::]:33445".parse().unwrap(); // [::] for ipv6
169-
170173
// Bind a UDP listener to the socket address.
171174
let socket = UdpSocket::bind(&local_addr).unwrap();
172175
socket.set_broadcast(true).expect("set_broadcast call failed");
@@ -223,36 +226,32 @@ fn main() {
223226
err
224227
});
225228

226-
let server: IoFuture<()> = Box::new(network);
227-
let server = add_lan_sender(server, &server_obj, local_addr);
228-
let server = add_server_main_loop(server, &server_obj);
229-
let server = add_onion_key_refresher(server, &server_obj);
230-
let server = server.join(dht_pk_handler).map(|_| ());
231-
let server = server.join(lossless_handler).map(|_| ());
232-
let server = server.join(lossy_handler).map(|_| ());
229+
let server: IoFuture<()> = Box::new(network); // TODO: remove these boxes on rustc 1.26
230+
let server: IoFuture<()> = Box::new(server.select(run_server(&server_obj)).map(|_| ()).map_err(|(e, _)| e));
231+
let server: IoFuture<()> = Box::new(server.select(run_lan_discovery_sender(lan_discovery_sender)).map(|_| ()).map_err(|(e, _)| e));
232+
let server: IoFuture<()> = Box::new(server.select(dht_pk_handler).map(|_| ()).map_err(|(e, _)| e));
233+
let server: IoFuture<()> = Box::new(server.select(lossless_handler).map(|_| ()).map_err(|(e, _)| e));
234+
let server: IoFuture<()> = Box::new(server.select(lossy_handler).map(|_| ()).map_err(|(e, _)| e));
233235

234-
let server = server
235-
.map(|_| ())
236-
.map_err(move |err| {
237-
error!("Processing ended with error: {:?}", err);
238-
()
239-
});
236+
let server = server.map_err(move |err| {
237+
error!("Processing ended with error: {:?}", err);
238+
()
239+
});
240240

241241
info!("server running on localhost:12345");
242242
tokio::run(server);
243243
}
244244

245-
fn add_server_main_loop(base_selector: IoFuture<()>, server_obj: &Server) -> IoFuture<()> {
246-
// 20 seconds for NodesRequest
245+
fn run_server(server_obj: &Server) -> IoFuture<()> {
247246
let interval = Duration::from_secs(1);
248-
let nodes_wakeups = Interval::new(Instant::now() + interval, interval);
247+
let dht_wakeups = Interval::new(Instant::now(), interval);
249248
let mut server_obj_c = server_obj.clone();
250249
let mut bootstrap_fast: bool = false;
251250

252-
let nodes_sender = nodes_wakeups
251+
let future = dht_wakeups
253252
.map_err(|e| Error::new(ErrorKind::Other, format!("Nodes timer error: {:?}", e)))
254253
.for_each(move |_instant| {
255-
println!("main_loop_wakeup");
254+
trace!("DHT server wake up");
256255
// flag for fast bootstrapping
257256
if bootstrap_fast {
258257
server_obj_c.dht_main_loop()
@@ -287,57 +286,19 @@ fn add_server_main_loop(base_selector: IoFuture<()>, server_obj: &Server) -> IoF
287286

288287
res
289288
}
290-
})
291-
.map_err(|_err| Error::new(ErrorKind::Other, "Nodes timer error"));
289+
});
292290

293-
Box::new(base_selector.select(Box::new(nodes_sender))
294-
.map(|_| ())
295-
.map_err(move |(err, _select_next)| {
296-
error!("Processing ended with error: {:?}", err);
297-
err
298-
}))
291+
Box::new(future)
299292
}
300293

301-
fn add_lan_sender(base_selector: IoFuture<()>, server_obj: &Server, local_addr: SocketAddr) -> IoFuture<()> {
302-
// 10 seconds for LanDiscovery
303-
let interval = Duration::from_secs(10);
304-
let lan_wakeups = Interval::new(Instant::now() + interval, interval);
305-
let server_obj_c = server_obj.clone();
306-
let lan_sender = lan_wakeups
294+
fn run_lan_discovery_sender(mut lan_discovery_sender: LanDiscoverySender) -> IoFuture<()> {
295+
let interval = Duration::from_secs(LAN_DISCOVERY_INTERVAL);
296+
let lan_wakeups = Interval::new(Instant::now(), interval);
297+
let future = lan_wakeups
307298
.map_err(|e| Error::new(ErrorKind::Other, format!("LanDiscovery timer error: {:?}", e)))
308299
.for_each(move |_instant| {
309-
println!("lan_wakeup");
310-
if local_addr.is_ipv4() {
311-
server_obj_c.send_lan_discovery_ipv4()
312-
} else {
313-
server_obj_c.send_lan_discovery_ipv6()
314-
}
300+
trace!("LAN discovery sender wake up");
301+
lan_discovery_sender.send()
315302
});
316-
317-
Box::new(base_selector.select(Box::new(lan_sender))
318-
.map(|_| ())
319-
.map_err(move |(err, _select_next)| {
320-
error!("Processing ended with error: {:?}", err);
321-
err
322-
}))
323-
}
324-
fn add_onion_key_refresher(base_selector: IoFuture<()>, server_obj: &Server) -> IoFuture<()> {
325-
// Refresh onion symmetric key every 2 hours. This enforces onion paths expiration.
326-
let interval = Duration::from_secs(7200);
327-
let refresh_onion_key_wakeups = Interval::new(Instant::now() + interval, interval);
328-
let server_obj_c = server_obj.clone();
329-
let onion_key_updater = refresh_onion_key_wakeups
330-
.map_err(|e| Error::new(ErrorKind::Other, format!("Refresh onion key timer error: {:?}", e)))
331-
.for_each(move |_instant| {
332-
println!("refresh_onion_key_wakeup");
333-
server_obj_c.refresh_onion_key();
334-
future::ok(())
335-
});
336-
337-
Box::new(base_selector.select(Box::new(onion_key_updater))
338-
.map(|_| ())
339-
.map_err(move |(err, _select_next)| {
340-
error!("Processing ended with error: {:?}", err);
341-
err
342-
}))
303+
Box::new(future)
343304
}

src/toxcore/binary_io.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,20 @@ macro_rules! encode_decode_test (
135135
}
136136
)
137137
);
138+
139+
/// Extract inner content of enums.
140+
#[cfg(test)]
141+
macro_rules! unpack {
142+
($variable:expr, $variant:path, $name:ident) => (
143+
match $variable {
144+
$variant { $name, .. } => $name,
145+
other => panic!("Expected {} but got {:?}", stringify!($variant), other),
146+
}
147+
);
148+
($variable:expr, $variant:path) => (
149+
match $variable {
150+
$variant(inner) => inner,
151+
other => panic!("Expected {} but got {:?}", stringify!($variant), other),
152+
}
153+
)
154+
}

0 commit comments

Comments
 (0)