Skip to content

Add total size bytes gauge for search after cache #5742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/reference/es_compatible_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ If a parameter appears both as a query string parameter and in the JSON payload,
| `q` | `String` | The search query. | (Optional) |
| `size` | `Integer` | Number of hits to return. | 10 |
| `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) |
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) |
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_searchscroll--scroll-api). | (Optional) |
| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` |

#### Supported Request Body parameters
Expand Down Expand Up @@ -279,6 +279,11 @@ First, the client needs to call the `search api` with a `scroll` query parameter

Each subsequent call to the `_search/scroll` endpoint will return a new `scroll_id` pointing to the next page.

:::tip

Using `_search` and then `_search/scroll` is somewhat similar to using `_search` with the `search_after` parameter, except that it creates a lightweight snapshot view of the dataset during the initial call to `_search`. Further calls to `_search/scroll` only return results from that view, thus ensuring more consistent results.

:::

### `_cat`   Cat API

Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ impl ClusterClient {
client.leaf_list_terms(request.clone()).await
}

/// Attempts to store a given search context within the cluster.
/// Attempts to store a given key value pair within the cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this is ever used for anything else than a search/scroll context, nor should it be (that's not a persistent storage)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Unfortunately, everything in the code, including core abstractions and RPCs, wants to make this generic. I just tried to add more consistency to that to help the reader.

I left hints in quickwit-search/src/scroll_context.rs and quickwit-search/src/metrics.rs about how this store is currently used. I also added some extra clarifications to the function contract here.

///
/// This function may fail silently, if no clients was available.
/// Tries to replicate the pair to [`TARGET_NUM_REPLICATION`] nodes, but this function may fail
/// silently (e.g if no client was available). Even in case of success, this storage is not
/// persistent. For instance during a rolling upgrade, all replicas will be lost as there is no
/// mechanism to maintain the replication count.
pub async fn put_kv(&self, key: &[u8], payload: &[u8], ttl: Duration) {
let clients: Vec<SearchServiceClient> = self
.search_job_placer
Expand All @@ -216,8 +219,8 @@ impl ClusterClient {
// course, this may still result in the replication over more nodes, but this is not
// a problem.
//
// The requests are made in a concurrent manner, up to two at a time. As soon as 2 requests
// are successful, we stop.
// The requests are made in a concurrent manner, up to TARGET_NUM_REPLICATION at a time. As
// soon as TARGET_NUM_REPLICATION requests are successful, we stop.
let put_kv_futs = clients
.into_iter()
.map(|client| replicate_kv_to_one_server(client, key, payload, ttl));
Expand Down
10 changes: 9 additions & 1 deletion quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use bytesize::ByteSize;
use once_cell::sync::Lazy;
use quickwit_common::metrics::{
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec,
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec,
new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
};

Expand All @@ -34,6 +34,7 @@ pub struct SearchMetrics {
pub leaf_search_single_split_tasks_pending: IntGauge,
pub leaf_search_single_split_tasks_ongoing: IntGauge,
pub leaf_search_single_split_warmup_num_bytes: Histogram,
pub searcher_local_kv_store_size_bytes: IntGauge,
}

impl Default for SearchMetrics {
Expand Down Expand Up @@ -146,6 +147,13 @@ impl Default for SearchMetrics {
&[],
["affinity"],
),
searcher_local_kv_store_size_bytes: new_gauge(
"searcher_local_kv_store_size_bytes",
"Size of the searcher kv store in bytes. This store is used to cache scroll \
contexts.",
"search",
&[],
),
}
}
}
Expand Down
39 changes: 31 additions & 8 deletions quickwit/quickwit-search/src/scroll_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::time::Duration;
use anyhow::Context;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use quickwit_common::metrics::GaugeGuard;
use quickwit_common::shared_consts::SCROLL_BATCH_LEN;
use quickwit_metastore::SplitMetadata;
use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError};
use quickwit_proto::types::IndexUid;
Expand All @@ -34,14 +36,13 @@ use crate::root::IndexMetasForLeafSearch;
use crate::service::SearcherContext;
use crate::ClusterClient;

/// Maximum capacity of the search after cache.
/// Maximum number of values in the local search KV store.
///
/// For the moment this value is hardcoded.
/// TODO make configurable.
///
/// Assuming a search context of 1MB, this can
/// amount to up to 1GB.
const SCROLL_BATCH_LEN: usize = 1_000;
const LOCAL_KV_CACHE_SIZE: usize = 1_000;

#[derive(Serialize, Deserialize)]
pub(crate) struct ScrollContext {
Expand Down Expand Up @@ -120,29 +121,51 @@ impl ScrollContext {
}
}

struct TrackedValue {
content: Vec<u8>,
_total_size_metric_guard: GaugeGuard<'static>,
}

/// In memory key value store with TTL and limited size.
///
/// Once the capacity [LOCAL_KV_CACHE_SIZE] is reached, the oldest entries are
/// removed.
///
/// Currently this store is only used for caching scroll contexts. Using it for
/// other purposes is risky as use cases would compete for its capacity.
#[derive(Clone)]
pub(crate) struct MiniKV {
ttl_with_cache: Arc<RwLock<TtlCache<Vec<u8>, Vec<u8>>>>,
ttl_with_cache: Arc<RwLock<TtlCache<Vec<u8>, TrackedValue>>>,
}

impl Default for MiniKV {
fn default() -> MiniKV {
MiniKV {
ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SCROLL_BATCH_LEN))),
ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(LOCAL_KV_CACHE_SIZE))),
}
}
}

impl MiniKV {
pub async fn put(&self, key: Vec<u8>, payload: Vec<u8>, ttl: Duration) {
let mut metric_guard =
GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes);
metric_guard.add(payload.len() as i64);
let mut cache_lock = self.ttl_with_cache.write().await;
cache_lock.insert(key, payload, ttl);
cache_lock.insert(
key,
TrackedValue {
content: payload,
_total_size_metric_guard: metric_guard,
},
ttl,
);
}

pub async fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let cache_lock = self.ttl_with_cache.read().await;
let search_after_context_bytes = cache_lock.get(key)?;
Some(search_after_context_bytes.clone())
let tracked_value = cache_lock.get(key)?;
Some(tracked_value.content.clone())
}
}

Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct SearchServiceImpl {
storage_resolver: StorageResolver,
cluster_client: ClusterClient,
searcher_context: Arc<SearcherContext>,
search_after_cache: MiniKV,
local_kv_store: MiniKV,
}

/// Trait representing a search service.
Expand Down Expand Up @@ -165,7 +165,7 @@ impl SearchServiceImpl {
storage_resolver,
cluster_client,
searcher_context,
search_after_cache: MiniKV::default(),
local_kv_store: MiniKV::default(),
}
}
}
Expand Down Expand Up @@ -322,13 +322,13 @@ impl SearchService for SearchServiceImpl {

async fn put_kv(&self, put_request: PutKvRequest) {
let ttl = Duration::from_secs(put_request.ttl_secs as u64);
self.search_after_cache
self.local_kv_store
.put(put_request.key, put_request.payload, ttl)
.await;
}

async fn get_kv(&self, get_request: GetKvRequest) -> Option<Vec<u8>> {
let payload: Vec<u8> = self.search_after_cache.get(&get_request.key).await?;
let payload: Vec<u8> = self.local_kv_store.get(&get_request.key).await?;
Some(payload)
}

Expand Down