Skip to content

Commit 6755630

Browse files
committed
broker stats
1 parent 9776957 commit 6755630

File tree

13 files changed

+318
-28
lines changed

13 files changed

+318
-28
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

broker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ futurize = "0.5.0"
1919
futurize-derive = "0.5.0"
2020
gcmap = "0.1.4"
2121
lazy_static = "1.2.0"
22+
dotenv = "0.13.0"
2223

2324
[dependencies.env_logger]
2425
version = "0.5.12"

broker/src/listener.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio;
1818
use tokio::net::UdpSocket;
1919
use carrier::transport;
2020
use xlog;
21+
use stats;
2122

2223
pub struct Listener {
2324
ep: endpoint::Endpoint,
@@ -45,7 +46,7 @@ pub fn listen(secret: identity::Secret) -> Result<(Listener, shadow::broker::Han
4546
let mut work = ep.work.clone();
4647
work.try_send(endpoint::EndpointWorkerCmd::InsertChannel(
4748
0,
48-
endpoint::ChannelBus::User { inc: tx },
49+
endpoint::ChannelBus::User { inc: tx, tc: stats::PacketCounter::default() },
4950
))?;
5051

5152
Ok((
@@ -115,7 +116,11 @@ impl ChannelHandshake {
115116

116117
pub fn accept(self, secret: identity::Secret) -> impl Future<Item = channel::Channel, Error = Error> {
117118
let (tx, rx) = mpsc::channel(100);
118-
let bus = endpoint::ChannelBus::User { inc: tx };
119+
let tc = stats::PacketCounter{
120+
initiator: Some(self.identity.clone()),
121+
..stats::PacketCounter::default()
122+
};
123+
let bus = endpoint::ChannelBus::User { inc: tx, tc };
119124

120125
let (route_tx, route_rx) = oneshot::channel();
121126

broker/src/main.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,46 @@ extern crate futurize_derive;
1212
extern crate gcmap;
1313
#[macro_use]
1414
extern crate lazy_static;
15+
extern crate dotenv;
1516

1617
use carrier::*;
1718
use failure::Error;
1819
use futures::{Future, Stream};
1920
use std::env;
21+
use dotenv::dotenv;
22+
use std::collections::HashSet;
2023

2124
mod ptrmap;
2225
mod shadow;
2326
mod listener;
2427
mod xlog;
2528

2629
pub fn main() {
30+
dotenv().ok();
31+
2732
if let Err(_) = env::var("RUST_LOG") {
2833
env::set_var("RUST_LOG", "carrier=info,carrier-broker");
2934
}
3035
env_logger::init();
3136

37+
38+
let coordinators : HashSet<identity::Identity> = env::var("COORDINATOR_IDENTITIES")
39+
.expect("must set env COORDINATOR_IDENTITIES")
40+
.split(":")
41+
.map(|v|v.parse().expect("parsing COORDINATOR_IDENTITIES"))
42+
.collect();
43+
3244
let secrets = keystore::Secrets::load().unwrap();
3345
tokio::run(futures::lazy(move || {
34-
broker(secrets.identity).map_err(|e| error!("{}", e))
46+
broker(secrets.identity, coordinators).map_err(|e| error!("{}", e))
3547
}));
3648
}
3749

38-
pub fn broker(secret: identity::Secret) -> impl Future<Item = (), Error = Error> {
50+
pub fn broker(secret: identity::Secret, coordinators: HashSet<identity::Identity>) -> impl Future<Item = (), Error = Error> {
3951
let (lst, sb) = listener::listen(secret.clone()).unwrap();
4052
let ep = lst.handle();
4153
lst.for_each(move |ch| {
54+
let coordinators = coordinators.clone();
4255
info!("incomming channel {} {}", ch.identity(), ch.addr());
4356
let addr = ch.addr().clone();
4457
let sb = sb.clone();
@@ -47,7 +60,7 @@ pub fn broker(secret: identity::Secret) -> impl Future<Item = (), Error = Error>
4760
.accept(secret.clone())
4861
.and_then(move |ch| {
4962
info!("accepted channel {} for route {}", ch.identity(), ch.route());
50-
sb.dispatch(ep, ch, addr)
63+
sb.dispatch(ep, ch, addr, coordinators)
5164
}).and_then(|_| {
5265
info!("dispatch ended");
5366
Ok(())

broker/src/shadow.rs

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use carrier::channel;
22
use carrier::endpoint;
33
use failure::Error;
44
use futures::sync::mpsc;
5+
use futures::sync::oneshot;
56
use futures::{self, Future, Sink, Stream};
67
use futurize;
78
use gcmap::{HashMap, MarkOnDrop};
@@ -12,6 +13,8 @@ use ptrmap;
1213
use std::net::SocketAddr;
1314
use tokio;
1415
use xlog;
16+
use stats;
17+
use std::collections::HashSet;
1518

1619
macro_rules! wrk_try {
1720
($self:ident, $x:expr) => {
@@ -288,6 +291,7 @@ impl Broker {
288291
}
289292

290293
impl broker::Worker for Broker {
294+
291295
fn subscribe(
292296
mut self,
293297
identity: identity::Identity,
@@ -397,11 +401,13 @@ impl peer::Worker for Peer {
397401
}
398402

399403
struct Srv {
400-
endpoint: endpoint::Endpoint,
401-
broker: broker::Handle,
402-
identity: identity::Identity,
403-
worker: peer::Handle,
404-
ipaddr: SocketAddr,
404+
endpoint: endpoint::Endpoint,
405+
broker: broker::Handle,
406+
identity: identity::Identity,
407+
worker: peer::Handle,
408+
ipaddr: SocketAddr,
409+
coordinators: HashSet<identity::Identity>,
410+
epoch: u64,
405411
}
406412

407413
impl broker::Handle {
@@ -410,6 +416,7 @@ impl broker::Handle {
410416
endpoint: endpoint::Endpoint,
411417
mut channel: channel::Channel,
412418
ipaddr: SocketAddr,
419+
coordinators: HashSet<identity::Identity>,
413420
) -> impl Future<Item = (), Error = Error> {
414421
let lst = channel.listener().unwrap();
415422
let identity = channel.identity().clone();
@@ -429,12 +436,56 @@ impl broker::Handle {
429436
worker: handle,
430437
endpoint,
431438
ipaddr,
439+
coordinators,
440+
epoch: 0,
432441
};
433442
proto::Broker::dispatch(lst, srv)
434443
}
435444
}
436445

437446
impl proto::Broker::Service for Srv {
447+
fn epochsync(
448+
&mut self,
449+
_headers: Headers,
450+
msg: proto::EpochSyncRequest,
451+
) -> Result<Box<Future<Item = proto::EpochSyncResponse, Error = Error> + Sync + Send + 'static>, Error> {
452+
453+
// check if the sender is a trusted coordinator
454+
if !self.coordinators.contains(&self.identity) {
455+
return Ok(Box::new(futures::future::ok(proto::EpochSyncResponse::default())));
456+
}
457+
458+
459+
info!("epoch sync from {} to {} by coordinator {}",
460+
self.epoch, msg.epoch, self.identity);
461+
462+
let clear = {
463+
if self.epoch != msg.epoch {
464+
self.epoch = msg.epoch;
465+
true
466+
} else {
467+
false
468+
}
469+
};
470+
471+
472+
let (r_tx, r_rx) = oneshot::channel();
473+
let ft = self
474+
.endpoint.work.clone().send(endpoint::EndpointWorkerCmd::DumpStats(r_tx, clear))
475+
.map_err(Error::from)
476+
.and_then(move |_| {
477+
r_rx
478+
.map_err(Error::from)
479+
.and_then(|dump|{
480+
Ok(proto::EpochSyncResponse{
481+
dump: Some(dump),
482+
})
483+
})
484+
});
485+
486+
Ok(Box::new(ft))
487+
}
488+
438489
fn subscribe(
439490
&mut self,
440491
_headers: Headers,
@@ -500,15 +551,22 @@ impl proto::Broker::Service for Srv {
500551

501552
let selfipaddr = self.ipaddr.clone();
502553
let selfidentity = self.identity.as_bytes().to_vec();
554+
let selfidentity_ = self.identity.clone();
503555
let mut endpoint = self.endpoint.clone();
504556

505557
let mut broker = self.broker.clone();
506558
let ft = futures::future::result(identity::Identity::from_bytes(msgidentity))
507559
.and_then(move |target| {
560+
let tc = stats::PacketCounter{
561+
initiator: Some(selfidentity_),
562+
responder: Some(target.clone()),
563+
..stats::PacketCounter::default()
564+
};
508565
broker.get_peer(target).and_then(move |maybe| {
509566
if let Some((mut peer, ipaddr)) = maybe {
567+
510568
let ft = endpoint
511-
.proxy(selfipaddr, ipaddr)
569+
.proxy(selfipaddr, ipaddr, tc)
512570
.and_then(move |proxy| {
513571
peer.connect(proto::PeerConnectRequest {
514572
identity: selfidentity,

cli/src/main.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ pub fn main_() -> Result<(), Error> {
9393

9494
.subcommand(SubCommand::with_name("mkshadow").about("create a shadow address"))
9595
.subcommand(
96+
SubCommand::with_name("sync")
97+
.about("coordinate a broker epoch")
98+
.arg(Arg::with_name("broker").takes_value(true).required(true).index(1))
99+
.arg(Arg::with_name("epoch").takes_value(true).required(true).index(2))
100+
).subcommand(
96101
SubCommand::with_name("dns")
97102
.about("create dns record")
98103
.arg(Arg::with_name("priority").takes_value(true).required(true).index(2))
@@ -449,6 +454,16 @@ pub fn main_() -> Result<(), Error> {
449454
}
450455
Ok(())
451456
}
457+
("sync", Some(submatches)) => {
458+
let secrets = keystore::Secrets::load()?;
459+
let broker: std::net::IpAddr = submatches.value_of("broker").unwrap().to_string().parse().expect("broker ip");
460+
let epoch: u64 = submatches.value_of("epoch").unwrap().to_string().parse().expect("epoch");
461+
462+
tokio::run(futures::lazy(move || {
463+
sync(secrets.identity, broker, epoch).map_err(|e| error!("{}", e))
464+
}));
465+
Ok(())
466+
}
452467
_ => unreachable!(),
453468
}
454469
}
@@ -667,4 +682,28 @@ pub fn update(
667682
}
668683

669684

685+
pub fn sync(
686+
secret: identity::Secret,
687+
broker: std::net::IpAddr,
688+
epoch: u64,
689+
) -> impl Future<Item = (), Error = Error> {
690+
let domain = env::var("CARRIER_BROKER_DOMAIN").unwrap_or("2.carrier.devguard.io".to_string());
691+
connect::connect_to_ip(domain, broker, secret.clone()).and_then(move |(_ep, mut brk, _sock, _addr)| {
692+
brk.message("/carrier.broker.v1/broker/epochsync")
693+
.unwrap()
694+
.send(proto::EpochSyncRequest{
695+
epoch,
696+
}).flatten_stream()
697+
.for_each(move |m: proto::EpochSyncResponse| {
698+
if let Some(dump) = m.dump {
699+
println!("{:#?}", dump);
700+
}
701+
Ok(())
702+
})
703+
.and_then(|_| {
704+
drop(brk);
705+
Ok(())
706+
})
707+
})
708+
}
670709

lib/proto/broker.proto

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,36 @@ message ConnectResponse {
7171
repeated Path paths = 4;
7272
}
7373

74+
75+
message EpochSyncRequest {
76+
uint64 epoch = 1;
77+
}
78+
79+
80+
81+
message EpochDumpPacketCounter {
82+
bytes identity = 1;
83+
uint64 dcs = 2;
84+
uint64 bx = 3;
85+
uint64 tx = 4;
86+
uint64 rx = 5;
87+
}
88+
89+
message EpochDump {
90+
repeated EpochDumpPacketCounter packets = 1;
91+
}
92+
93+
94+
message EpochSyncResponse {
95+
EpochDump dump = 1;
96+
}
97+
7498
service Broker {
7599
rpc subscribe (SubscribeRequest) returns (stream SubscribeChange) {}
76100
rpc publish (PublishRequest) returns (stream PublishChange) {}
77-
78101
rpc connect (ConnectRequest) returns (stream ConnectResponse) {}
102+
103+
rpc epochsync (EpochSyncRequest) returns (EpochSyncResponse) {}
79104
}
80105

81106
message PeerConnectRequest {

0 commit comments

Comments
 (0)