Skip to content

Commit 0308a77

Browse files
Frandorklaehn
andauthored
refactor!: update to latest iroh-metrics version, use non-global metrics collection (#85)
## Description Updates metrics tracking to the new non-global tracking. Metrics are tracked per downloader. We already only track metrics within the downloader, so this was rather straightforward. The tracking of request stats was changed to be tracked once the download task completes, as this seemed easier to me. Needs a careful review, but I think the change is sound (i.e. no change). Depends on n0-computer/iroh#3262 ## Breaking Changes * `metrics::Metrics` now implements `MetricsGroup` from the recent ìroh-metrics` release (TODO: fill in version after release) * Metrics are no longer tracked into the `static_core` from `iroh-metrics`, but instead are tracked per `Downloder` and exposed via `Downloader::metrics` ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --------- Co-authored-by: Ruediger Klaehn <[email protected]>
1 parent fbc6f47 commit 0308a77

File tree

8 files changed

+376
-440
lines changed

8 files changed

+376
-440
lines changed

Cargo.lock

Lines changed: 282 additions & 361 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ genawaiter = { version = "0.99.1", features = ["futures03"] }
4040
hashlink = { version = "0.9.0", optional = true }
4141
hex = "0.4.3"
4242
indicatif = { version = "0.17.8", optional = true }
43-
iroh-base = { version = "0.34" }
43+
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
4444
iroh-io = { version = "0.6.0", features = ["stats"] }
45-
iroh-metrics = { version = "0.32", default-features = false }
46-
iroh = "0.34"
45+
iroh-metrics = { version = "0.34", default-features = false }
46+
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
4747
nested_enum_utils = { version = "0.1.0", optional = true }
4848
num_cpus = "1.15.0"
4949
oneshot = "0.1.8"
@@ -80,7 +80,7 @@ tracing-test = "0.2.5"
8080

8181
[dev-dependencies]
8282
http-body = "1.0"
83-
iroh = { version = "0.34", features = ["test-utils"] }
83+
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main", features = ["test-utils"] }
8484
quinn = { package = "iroh-quinn", version = "0.13", features = ["ring"] }
8585
futures-buffered = "0.2.4"
8686
proptest = "1.0.0"

deny.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ allow = [
1919
"MIT",
2020
"Zlib",
2121
"MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/
22-
"Unicode-3.0"
22+
"Unicode-3.0",
23+
"Unlicense", # https://unlicense.org/
2324
]
2425

2526
[[licenses.clarify]]
@@ -38,4 +39,6 @@ ignore = [
3839
]
3940

4041
[sources]
41-
allow-git = []
42+
allow-git = [
43+
"https://github.com/n0-computer/iroh.git",
44+
]

src/downloader.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use anyhow::anyhow;
4646
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
4747
use hashlink::LinkedHashSet;
4848
use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
49-
use iroh_metrics::inc;
5049
use tokio::{
5150
sync::{mpsc, oneshot},
5251
task::JoinSet,
@@ -55,7 +54,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
5554
use tracing::{debug, error, error_span, trace, warn, Instrument};
5655

5756
use crate::{
58-
get::{db::DownloadProgress, Stats},
57+
get::{db::DownloadProgress, error::GetError, Stats},
5958
metrics::Metrics,
6059
store::Store,
6160
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
@@ -98,7 +97,7 @@ pub enum FailureAction {
9897
/// The request was cancelled by us.
9998
AllIntentsDropped,
10099
/// An error occurred that prevents the request from being retried at all.
101-
AbortRequest(anyhow::Error),
100+
AbortRequest(GetError),
102101
/// An error occurred that suggests the node should not be used in general.
103102
DropPeer(anyhow::Error),
104103
/// An error occurred in which neither the node nor the request are at fault.
@@ -332,6 +331,7 @@ pub struct Downloader {
332331
next_id: Arc<AtomicU64>,
333332
/// Channel to communicate with the service.
334333
msg_tx: mpsc::Sender<Message>,
334+
metrics: Arc<Metrics>,
335335
}
336336

337337
impl Downloader {
@@ -354,23 +354,33 @@ impl Downloader {
354354
where
355355
S: Store,
356356
{
357+
let metrics = Arc::new(Metrics::default());
357358
let me = endpoint.node_id().fmt_short();
358359
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
359360
let dialer = Dialer::new(endpoint);
360361

362+
let metrics_clone = metrics.clone();
361363
let create_future = move || {
362364
let getter = get::IoGetter {
363365
store: store.clone(),
364366
};
365367

366-
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
368+
let service = Service::new(
369+
getter,
370+
dialer,
371+
concurrency_limits,
372+
retry_config,
373+
msg_rx,
374+
metrics_clone,
375+
);
367376

368377
service.run().instrument(error_span!("downloader", %me))
369378
};
370379
rt.spawn_detached(create_future);
371380
Self {
372381
next_id: Arc::new(AtomicU64::new(0)),
373382
msg_tx,
383+
metrics,
374384
}
375385
}
376386

@@ -424,6 +434,11 @@ impl Downloader {
424434
debug!(?msg, "nodes have not been sent")
425435
}
426436
}
437+
438+
/// Returns the metrics collected for this downloader.
439+
pub fn metrics(&self) -> &Arc<Metrics> {
440+
&self.metrics
441+
}
427442
}
428443

429444
/// Messages the service can receive.
@@ -565,6 +580,7 @@ struct Service<G: Getter, D: DialerT> {
565580
in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
566581
/// Progress tracker
567582
progress_tracker: ProgressTracker,
583+
metrics: Arc<Metrics>,
568584
}
569585
impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
570586
fn new(
@@ -573,6 +589,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
573589
concurrency_limits: ConcurrencyLimits,
574590
retry_config: RetryConfig,
575591
msg_rx: mpsc::Receiver<Message>,
592+
metrics: Arc<Metrics>,
576593
) -> Self {
577594
Service {
578595
getter,
@@ -590,23 +607,24 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
590607
in_progress_downloads: Default::default(),
591608
progress_tracker: ProgressTracker::new(),
592609
queue: Default::default(),
610+
metrics,
593611
}
594612
}
595613

596614
/// Main loop for the service.
597615
async fn run(mut self) {
598616
loop {
599617
trace!("wait for tick");
600-
inc!(Metrics, downloader_tick_main);
618+
self.metrics.downloader_tick_main.inc();
601619
tokio::select! {
602620
Some((node, conn_result)) = self.dialer.next() => {
603621
trace!(node=%node.fmt_short(), "tick: connection ready");
604-
inc!(Metrics, downloader_tick_connection_ready);
622+
self.metrics.downloader_tick_connection_ready.inc();
605623
self.on_connection_ready(node, conn_result);
606624
}
607625
maybe_msg = self.msg_rx.recv() => {
608626
trace!(msg=?maybe_msg, "tick: message received");
609-
inc!(Metrics, downloader_tick_message_received);
627+
self.metrics.downloader_tick_message_received.inc();
610628
match maybe_msg {
611629
Some(msg) => self.handle_message(msg).await,
612630
None => return self.shutdown().await,
@@ -616,25 +634,26 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
616634
match res {
617635
Ok((kind, result)) => {
618636
trace!(%kind, "tick: transfer completed");
619-
inc!(Metrics, downloader_tick_transfer_completed);
637+
self::get::track_metrics(&result, &self.metrics);
638+
self.metrics.downloader_tick_transfer_completed.inc();
620639
self.on_download_completed(kind, result);
621640
}
622641
Err(err) => {
623642
warn!(?err, "transfer task panicked");
624-
inc!(Metrics, downloader_tick_transfer_failed);
643+
self.metrics.downloader_tick_transfer_failed.inc();
625644
}
626645
}
627646
}
628647
Some(expired) = self.retry_nodes_queue.next() => {
629648
let node = expired.into_inner();
630649
trace!(node=%node.fmt_short(), "tick: retry node");
631-
inc!(Metrics, downloader_tick_retry_node);
650+
self.metrics.downloader_tick_retry_node.inc();
632651
self.on_retry_wait_elapsed(node);
633652
}
634653
Some(expired) = self.goodbye_nodes_queue.next() => {
635654
let node = expired.into_inner();
636655
trace!(node=%node.fmt_short(), "tick: goodbye node");
637-
inc!(Metrics, downloader_tick_goodbye_node);
656+
self.metrics.downloader_tick_goodbye_node.inc();
638657
self.disconnect_idle_node(node, "idle expired");
639658
}
640659
}

src/downloader/get.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ use crate::{
1414
impl From<GetError> for FailureAction {
1515
fn from(e: GetError) -> Self {
1616
match e {
17-
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
17+
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e),
1818
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
1919
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
2020
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
21-
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
21+
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e),
2222
// TODO: what do we want to do on local failures?
23-
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
23+
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e),
2424
}
2525
}
2626
}
@@ -61,8 +61,6 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
6161
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
6262
async move {
6363
let res = self.proceed(conn).await;
64-
#[cfg(feature = "metrics")]
65-
track_metrics(&res);
6664
match res {
6765
Ok(stats) => Ok(stats),
6866
Err(err) => Err(err.into()),
@@ -72,11 +70,10 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
7270
}
7371
}
7472

75-
#[cfg(feature = "metrics")]
76-
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
77-
use iroh_metrics::{inc, inc_by};
78-
79-
use crate::metrics::Metrics;
73+
pub(super) fn track_metrics(
74+
res: &Result<crate::get::Stats, FailureAction>,
75+
metrics: &crate::metrics::Metrics,
76+
) {
8077
match res {
8178
Ok(stats) => {
8279
let crate::get::Stats {
@@ -85,13 +82,19 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
8582
elapsed,
8683
} = stats;
8784

88-
inc!(Metrics, downloads_success);
89-
inc_by!(Metrics, download_bytes_total, *bytes_written);
90-
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
85+
metrics.downloads_success.inc();
86+
metrics.download_bytes_total.inc_by(*bytes_written);
87+
metrics
88+
.download_time_total
89+
.inc_by(elapsed.as_millis() as u64);
9190
}
9291
Err(e) => match &e {
93-
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
94-
_ => inc!(Metrics, downloads_error),
92+
FailureAction::AbortRequest(GetError::NotFound(_)) => {
93+
metrics.downloads_notfound.inc();
94+
}
95+
_ => {
96+
metrics.downloads_error.inc();
97+
}
9598
},
9699
}
97100
}

src/downloader/test.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,28 @@ impl Downloader {
4545
retry_config: RetryConfig,
4646
) -> (Self, LocalPool) {
4747
let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY);
48+
let metrics = Arc::new(Metrics::default());
4849

4950
let lp = LocalPool::default();
51+
let metrics_clone = metrics.clone();
5052
lp.spawn_detached(move || async move {
5153
// we want to see the logs of the service
52-
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
54+
let service = Service::new(
55+
getter,
56+
dialer,
57+
concurrency_limits,
58+
retry_config,
59+
msg_rx,
60+
metrics_clone,
61+
);
5362
service.run().await
5463
});
5564

5665
(
5766
Downloader {
5867
next_id: Arc::new(AtomicU64::new(0)),
5968
msg_tx,
69+
metrics,
6070
},
6171
lp,
6272
)

src/metrics.rs

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,40 @@
11
//! Metrics for iroh-blobs
22
3-
use iroh_metrics::{
4-
core::{Counter, Metric},
5-
struct_iterable::Iterable,
6-
};
3+
use iroh_metrics::{Counter, MetricsGroup};
74

85
/// Enum of metrics for the module
9-
#[allow(missing_docs)]
10-
#[derive(Debug, Clone, Iterable)]
6+
#[derive(Debug, MetricsGroup, Default)]
7+
#[metrics(name = "iroh-blobs")]
118
pub struct Metrics {
9+
/// Total number of content bytes downloaded
1210
pub download_bytes_total: Counter,
11+
/// Total time in ms spent downloading content bytes
1312
pub download_time_total: Counter,
13+
/// Total number of successful downloads
1414
pub downloads_success: Counter,
15+
/// Total number of downloads failed with error
1516
pub downloads_error: Counter,
17+
/// Total number of downloads failed with not found
1618
pub downloads_notfound: Counter,
1719

20+
/// Number of times the main pub downloader actor loop ticked
1821
pub downloader_tick_main: Counter,
22+
23+
/// Number of times the pub downloader actor ticked for a connection ready
1924
pub downloader_tick_connection_ready: Counter,
25+
26+
/// Number of times the pub downloader actor ticked for a message received
2027
pub downloader_tick_message_received: Counter,
28+
29+
/// Number of times the pub downloader actor ticked for a transfer completed
2130
pub downloader_tick_transfer_completed: Counter,
22-
pub downloader_tick_transfer_failed: Counter,
23-
pub downloader_tick_retry_node: Counter,
24-
pub downloader_tick_goodbye_node: Counter,
25-
}
2631

27-
impl Default for Metrics {
28-
fn default() -> Self {
29-
Self {
30-
download_bytes_total: Counter::new("Total number of content bytes downloaded"),
31-
download_time_total: Counter::new("Total time in ms spent downloading content bytes"),
32-
downloads_success: Counter::new("Total number of successful downloads"),
33-
downloads_error: Counter::new("Total number of downloads failed with error"),
34-
downloads_notfound: Counter::new("Total number of downloads failed with not found"),
32+
/// Number of times the pub downloader actor ticked for a transfer failed
33+
pub downloader_tick_transfer_failed: Counter,
3534

36-
downloader_tick_main: Counter::new(
37-
"Number of times the main downloader actor loop ticked",
38-
),
39-
downloader_tick_connection_ready: Counter::new(
40-
"Number of times the downloader actor ticked for a connection ready",
41-
),
42-
downloader_tick_message_received: Counter::new(
43-
"Number of times the downloader actor ticked for a message received",
44-
),
45-
downloader_tick_transfer_completed: Counter::new(
46-
"Number of times the downloader actor ticked for a transfer completed",
47-
),
48-
downloader_tick_transfer_failed: Counter::new(
49-
"Number of times the downloader actor ticked for a transfer failed",
50-
),
51-
downloader_tick_retry_node: Counter::new(
52-
"Number of times the downloader actor ticked for a retry node",
53-
),
54-
downloader_tick_goodbye_node: Counter::new(
55-
"Number of times the downloader actor ticked for a goodbye node",
56-
),
57-
}
58-
}
59-
}
35+
/// Number of times the pub downloader actor ticked for a retry node
36+
pub downloader_tick_retry_node: Counter,
6037

61-
impl Metric for Metrics {
62-
fn name() -> &'static str {
63-
"iroh-blobs"
64-
}
38+
/// Number of times the pub downloader actor ticked for a goodbye node
39+
pub downloader_tick_goodbye_node: Counter,
6540
}

src/net_protocol.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use tracing::debug;
1919

2020
use crate::{
2121
downloader::{ConcurrencyLimits, Downloader, RetryConfig},
22+
metrics::Metrics,
2223
provider::EventSender,
2324
store::GcConfig,
2425
util::{
@@ -258,6 +259,10 @@ impl<S: crate::store::Store> Blobs<S> {
258259
&self.inner.store
259260
}
260261

262+
pub fn metrics(&self) -> &Arc<Metrics> {
263+
self.downloader().metrics()
264+
}
265+
261266
pub fn events(&self) -> &EventSender {
262267
&self.inner.events
263268
}

0 commit comments

Comments
 (0)