Skip to content

Optimize simple time ranged search queries #5759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 189 additions & 108 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,11 +942,6 @@ fn is_simple_all_query(search_request: &SearchRequest) -> bool {
return false;
}

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() {
return false;
}

let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else {
return false;
};
Expand Down Expand Up @@ -1000,134 +995,220 @@ impl CanSplitDoBetter {
}
}

/// Optimize the order in which splits will get processed based on how it can skip the most
/// splits.
fn is_split_contained_in_search_time_range(
split: &SplitIdAndFooterOffsets,
search_request: &SearchRequest,
) -> bool {
if let Some(start) = search_request.start_timestamp {
let Some(split_start) = split.timestamp_start else {
return false;
};
if split_start < start {
return false;
}
}
if let Some(end) = search_request.end_timestamp {
let Some(split_end) = split.timestamp_end else {
return false;
};
if split_end >= end {
return false;
}
}
true
}

fn to_splits_with_request(
splits: Vec<SplitIdAndFooterOffsets>,
request: Arc<SearchRequest>,
) -> Vec<(SplitIdAndFooterOffsets, SearchRequest)> {
// TODO: we maybe want here some deduplication + Cow logic
splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>()
Comment on lines +1026 to +1029
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the comment:

// TODO: we maybe want here some deduplication + Cow logic

}

/// Calculate the number of splits which are guaranteed to deliver enough documents.
///
/// The leaf search code contains some logic that makes it possible to skip entire splits
/// when we are confident they won't make it into top K.
/// To make this optimization as potent as possible, we sort the splits so that the first splits
/// are the most likely to fill our Top K.
/// In the future, as split get more metadata per column, we may be able to do this more than
/// just for timestamp and "unsorted" request.
fn optimize_split_order(&self, splits: &mut [SplitIdAndFooterOffsets]) {
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id))
/// If there's a time range and not enough splits contain at least the number of requested
/// documents, return None.
fn get_min_required_splits(
splits: &[SplitIdAndFooterOffsets],
request: &SearchRequest,
) -> Option<usize> {
Comment on lines +1036 to +1039
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this is now really straightforward to understand and verify 😄

let num_requested_docs = request.start_offset + request.max_hits;

let mut min_required_splits = 0;
let mut partial_sum = 0;

for split in splits.iter() {
if !Self::is_split_contained_in_search_time_range(split, request) {
continue;
}
CanSplitDoBetter::SplitTimestampHigher(_)
| CanSplitDoBetter::FindTraceIdsAggregation(_) => {
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end()))

partial_sum += split.num_docs;

min_required_splits += 1;
if partial_sum >= num_requested_docs {
return Some(min_required_splits);
}
CanSplitDoBetter::SplitTimestampLower(_) => {
splits.sort_unstable_by_key(|split| split.timestamp_start())
}

None
}

fn optimize_split_id_higher(
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id));

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(Self::to_splits_with_request(splits, request));
}

let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else {
// not enough splits contained in time range.
return Ok(Self::to_splits_with_request(splits, request));
};

let mut split_with_req = Self::to_splits_with_request(splits, request);

// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries.
for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) {
disable_search_request_hits(request);
}

Ok(split_with_req)
}

fn optimize_split_timestamp_higher(
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end()));

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(Self::to_splits_with_request(splits, request));
}

let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else {
// not enough splits contained in time range.
return Ok(Self::to_splits_with_request(splits, request));
};

let mut split_with_req = Self::to_splits_with_request(splits, request);

// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_start())
.min()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MAX);
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) {
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
CanSplitDoBetter::Uninformative => (),
}

Ok(split_with_req)
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
fn optimize_split_timestamp_lower(
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);
splits.sort_unstable_by_key(|split| split.timestamp_start());

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>());
return Ok(Self::to_splits_with_request(splits, request));
}

let num_requested_docs = request.start_offset + request.max_hits;
let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else {
// not enough splits contained in time range.
return Ok(Self::to_splits_with_request(splits, request));
};

// Calculate the number of splits which are guaranteed to deliver enough documents.
let min_required_splits = splits
let mut split_with_req = Self::to_splits_with_request(splits, request);

// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.map(|split| split.num_docs)
// computing the partial sum
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| {
*partial_sum += num_docs_in_split;
Some(*partial_sum)
})
.take_while(|partial_sum| *partial_sum < num_requested_docs)
.count()
+ 1;
.take(min_required_splits)
.map(|(split, _)| split.timestamp_end())
.max()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MIN);
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) {
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}

// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();
Ok(split_with_req)
}

fn optimize_find_trace_ids_aggregation(
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end()));

if !is_simple_all_query(&request) {
// no optimization opportunity here.
return Ok(Self::to_splits_with_request(splits, request));
}

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
Ok(Self::to_splits_with_request(splits, request))
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
request: Arc<SearchRequest>,
splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) {
disable_search_request_hits(request);
}
CanSplitDoBetter::SplitIdHigher(_) => Self::optimize_split_id_higher(request, splits),
CanSplitDoBetter::SplitTimestampHigher(_) => {
Self::optimize_split_timestamp_higher(request, splits)
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_end())
.max()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MIN);
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) {
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}
Self::optimize_split_timestamp_lower(request, splits)
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(min_required_splits)
.map(|(split, _)| split.timestamp_start())
.min()
// if min_required_splits is 0, we choose a value that disables all splits
.unwrap_or(i64::MAX);
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) {
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {
Self::optimize_find_trace_ids_aggregation(request, splits)
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {}
CanSplitDoBetter::Uninformative => Ok(Self::to_splits_with_request(splits, request)),
}

Ok(split_with_req)
}

/// Returns whether the given split can possibly give documents better than the one already
Expand Down