Skip to content

Commit 6aebdbe

Browse files
committed
Refactor metrics trackers to their own file
1 parent e0fa683 commit 6aebdbe

File tree

7 files changed

+163
-132
lines changed

7 files changed

+163
-132
lines changed

quickwit/quickwit-search/src/leaf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ impl CanSplitDoBetter {
11741174
}
11751175
}
11761176

1177-
/// Searches multiple splits, potentially in multiple indices, sitting on different storages and
1177+
/// Searches multiple splits, potentially in multiple indexes, sitting on different storages and
11781178
/// having different doc mappings.
11791179
#[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))]
11801180
pub async fn multi_index_leaf_search(
@@ -1299,7 +1299,7 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) {
12991299
search_request.sort_fields.clear();
13001300
}
13011301

1302-
/// Searches multiple splits from a specific index and a single doc mapping
1302+
/// Searches multiple splits for a specific index and a single doc mapping
13031303
///
13041304
/// The leaf search collects all kind of information, and returns a set of
13051305
/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in

quickwit/quickwit-search/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod leaf_cache;
2929
mod list_fields;
3030
mod list_fields_cache;
3131
mod list_terms;
32+
mod metrics_trackers;
3233
mod retry;
3334
mod root;
3435
mod scroll_context;

quickwit/quickwit-search/src/list_fields.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ pub struct IndexMetasForLeafSearch {
279279
}
280280

281281
/// Performs a distributed list fields request.
282-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
282+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
283283
/// 2. Merges the search results.
284284
/// 3. Builds the response and returns.
285285
pub async fn root_list_fields(

quickwit/quickwit-search/src/list_terms.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::search_permit_provider::compute_initial_memory_allocation;
3939
use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_index_patterns};
4040

4141
/// Performs a distributed list terms.
42-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
42+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
4343
/// 2. Merges the search results.
4444
/// 3. Builds the response and returns.
4545
/// this is much simpler than `root_search` as it doesn't need to get actual docs.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// See https://prometheus.io/docs/practices/naming/
16+
17+
use std::pin::Pin;
18+
use std::task::{Context, Poll, ready};
19+
use std::time::Instant;
20+
21+
use pin_project::{pin_project, pinned_drop};
22+
use quickwit_proto::search::LeafSearchResponse;
23+
24+
use crate::SearchError;
25+
use crate::metrics::SEARCH_METRICS;
26+
27+
// root
28+
29+
pub enum RootSearchMetricsStep {
30+
Plan,
31+
Exec { num_targeted_splits: usize },
32+
}
33+
34+
/// Wrapper around the plan and search futures to track metrics.
35+
#[pin_project(PinnedDrop)]
36+
pub struct RootSearchMetricsFuture<F> {
37+
#[pin]
38+
pub tracked: F,
39+
pub start: Instant,
40+
pub step: RootSearchMetricsStep,
41+
pub is_success: Option<bool>,
42+
}
43+
44+
#[pinned_drop]
45+
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
46+
fn drop(self: Pin<&mut Self>) {
47+
let (num_targeted_splits, status) = match (&self.step, self.is_success) {
48+
// is is a partial success, actual success is recorded during the search step
49+
(RootSearchMetricsStep::Plan, Some(true)) => return,
50+
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
51+
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
52+
(
53+
RootSearchMetricsStep::Exec {
54+
num_targeted_splits,
55+
},
56+
Some(true),
57+
) => (*num_targeted_splits, "success"),
58+
(
59+
RootSearchMetricsStep::Exec {
60+
num_targeted_splits,
61+
},
62+
Some(false),
63+
) => (*num_targeted_splits, "error"),
64+
(
65+
RootSearchMetricsStep::Exec {
66+
num_targeted_splits,
67+
},
68+
None,
69+
) => (*num_targeted_splits, "cancelled"),
70+
};
71+
72+
let label_values = [status];
73+
SEARCH_METRICS
74+
.root_search_requests_total
75+
.with_label_values(label_values)
76+
.inc();
77+
SEARCH_METRICS
78+
.root_search_request_duration_seconds
79+
.with_label_values(label_values)
80+
.observe(self.start.elapsed().as_secs_f64());
81+
SEARCH_METRICS
82+
.root_search_targeted_splits
83+
.with_label_values(label_values)
84+
.observe(num_targeted_splits as f64);
85+
}
86+
}
87+
88+
impl<F, R, E> Future for RootSearchMetricsFuture<F>
89+
where F: Future<Output = Result<R, E>>
90+
{
91+
type Output = Result<R, E>;
92+
93+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
94+
let this = self.project();
95+
let response = ready!(this.tracked.poll(cx));
96+
*this.is_success = Some(response.is_ok());
97+
Poll::Ready(Ok(response?))
98+
}
99+
}
100+
101+
// leaf
102+
103+
/// Wrapper around the search future to track metrics.
104+
#[pin_project(PinnedDrop)]
105+
pub struct LeafSearchMetricsFuture<F>
106+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
107+
{
108+
#[pin]
109+
pub tracked: F,
110+
pub start: Instant,
111+
pub targeted_splits: usize,
112+
pub status: Option<&'static str>,
113+
}
114+
115+
#[pinned_drop]
116+
impl<F> PinnedDrop for LeafSearchMetricsFuture<F>
117+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
118+
{
119+
fn drop(self: Pin<&mut Self>) {
120+
let label_values = [self.status.unwrap_or("cancelled")];
121+
SEARCH_METRICS
122+
.leaf_search_requests_total
123+
.with_label_values(label_values)
124+
.inc();
125+
SEARCH_METRICS
126+
.leaf_search_request_duration_seconds
127+
.with_label_values(label_values)
128+
.observe(self.start.elapsed().as_secs_f64());
129+
SEARCH_METRICS
130+
.leaf_search_targeted_splits
131+
.with_label_values(label_values)
132+
.observe(self.targeted_splits as f64);
133+
}
134+
}
135+
136+
impl<F> Future for LeafSearchMetricsFuture<F>
137+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
138+
{
139+
type Output = Result<LeafSearchResponse, SearchError>;
140+
141+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142+
let this = self.project();
143+
let response = ready!(this.tracked.poll(cx));
144+
*this.status = if response.is_ok() {
145+
Some("success")
146+
} else {
147+
Some("error")
148+
};
149+
Poll::Ready(Ok(response?))
150+
}
151+
}

quickwit/quickwit-search/src/root.rs

Lines changed: 6 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,13 @@
1313
// limitations under the License.
1414

1515
use std::collections::{HashMap, HashSet};
16-
use std::future::Future;
17-
use std::pin::Pin;
1816
use std::sync::OnceLock;
1917
use std::sync::atomic::{AtomicU64, Ordering};
20-
use std::task::{Context as TaskContext, Poll, ready};
21-
use std::time::Duration;
18+
use std::time::{Duration, Instant};
2219

2320
use anyhow::Context;
2421
use futures::future::try_join_all;
2522
use itertools::Itertools;
26-
use pin_project::{pin_project, pinned_drop};
2723
use quickwit_common::pretty::PrettySample;
2824
use quickwit_common::shared_consts;
2925
use quickwit_common::uri::Uri;
@@ -49,12 +45,11 @@ use tantivy::aggregation::agg_result::AggregationResults;
4945
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
5046
use tantivy::collector::Collector;
5147
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
52-
use tokio::time::Instant;
5348
use tracing::{debug, info_span, instrument};
5449

5550
use crate::cluster_client::ClusterClient;
5651
use crate::collector::{QuickwitAggregations, make_merge_collector};
57-
use crate::metrics::SEARCH_METRICS;
52+
use crate::metrics_trackers::{RootSearchMetricsFuture, RootSearchMetricsStep};
5853
use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset};
5954
use crate::search_job_placer::{Job, group_by, group_jobs_by_index_id};
6055
use crate::search_response_rest::StorageRequestCount;
@@ -959,7 +954,7 @@ fn get_sort_field_datetime_format(
959954
}
960955

961956
/// Performs a distributed search.
962-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
957+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
963958
/// 2. Merges the search results.
964959
/// 3. Sends fetch docs requests to multiple leaf nodes.
965960
/// 4. Builds the response with docs and returns.
@@ -1184,7 +1179,7 @@ async fn plan_splits_for_root_search(
11841179
}
11851180

11861181
/// Performs a distributed search.
1187-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
1182+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
11881183
/// 2. Merges the search results.
11891184
/// 3. Sends fetch docs requests to multiple leaf nodes.
11901185
/// 4. Builds the response with docs and returns.
@@ -1195,7 +1190,7 @@ pub async fn root_search(
11951190
mut metastore: MetastoreServiceClient,
11961191
cluster_client: &ClusterClient,
11971192
) -> crate::Result<SearchResponse> {
1198-
let start_instant = tokio::time::Instant::now();
1193+
let start_instant = Instant::now();
11991194

12001195
let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
12011196
start: start_instant,
@@ -1222,7 +1217,7 @@ pub async fn root_search(
12221217
),
12231218
is_success: None,
12241219
step: RootSearchMetricsStep::Exec {
1225-
targeted_splits: num_splits,
1220+
num_targeted_splits: num_splits,
12261221
},
12271222
}
12281223
.await;
@@ -1760,69 +1755,6 @@ pub fn jobs_to_fetch_docs_requests(
17601755
Ok(fetch_docs_requests)
17611756
}
17621757

1763-
enum RootSearchMetricsStep {
1764-
Plan,
1765-
Exec { targeted_splits: usize },
1766-
}
1767-
1768-
/// Wrapper around the plan and search futures to track metrics.
1769-
#[pin_project(PinnedDrop)]
1770-
struct RootSearchMetricsFuture<F> {
1771-
#[pin]
1772-
tracked: F,
1773-
start: Instant,
1774-
step: RootSearchMetricsStep,
1775-
is_success: Option<bool>,
1776-
}
1777-
1778-
#[pinned_drop]
1779-
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
1780-
fn drop(self: Pin<&mut Self>) {
1781-
let (targeted_splits, status) = match (&self.step, self.is_success) {
1782-
// is is a partial success, actual success is recorded during the search step
1783-
(RootSearchMetricsStep::Plan, Some(true)) => return,
1784-
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
1785-
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
1786-
(RootSearchMetricsStep::Exec { targeted_splits }, Some(true)) => {
1787-
(*targeted_splits, "success")
1788-
}
1789-
(RootSearchMetricsStep::Exec { targeted_splits }, Some(false)) => {
1790-
(*targeted_splits, "error")
1791-
}
1792-
(RootSearchMetricsStep::Exec { targeted_splits }, None) => {
1793-
(*targeted_splits, "cancelled")
1794-
}
1795-
};
1796-
1797-
let label_values = [status];
1798-
SEARCH_METRICS
1799-
.root_search_requests_total
1800-
.with_label_values(label_values)
1801-
.inc();
1802-
SEARCH_METRICS
1803-
.root_search_request_duration_seconds
1804-
.with_label_values(label_values)
1805-
.observe(self.start.elapsed().as_secs_f64());
1806-
SEARCH_METRICS
1807-
.root_search_targeted_splits
1808-
.with_label_values(label_values)
1809-
.observe(targeted_splits as f64);
1810-
}
1811-
}
1812-
1813-
impl<F, R, E> Future for RootSearchMetricsFuture<F>
1814-
where F: Future<Output = Result<R, E>>
1815-
{
1816-
type Output = Result<R, E>;
1817-
1818-
fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
1819-
let this = self.project();
1820-
let response = ready!(this.tracked.poll(cx));
1821-
*this.is_success = Some(response.is_ok());
1822-
Poll::Ready(Ok(response?))
1823-
}
1824-
}
1825-
18261758
#[cfg(test)]
18271759
mod tests {
18281760
use std::ops::Range;

0 commit comments

Comments
 (0)