-
Notifications
You must be signed in to change notification settings - Fork 452
Optimize simple time ranged search queries #5759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tontinton
wants to merge
2
commits into
quickwit-oss:main
Choose a base branch
from
tontinton:optimize-timestamp-range-simple-search
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -942,11 +942,6 @@ fn is_simple_all_query(search_request: &SearchRequest) -> bool { | |
return false; | ||
} | ||
|
||
// TODO: Update the logic to handle start_timestamp end_timestamp ranges | ||
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() { | ||
return false; | ||
} | ||
|
||
let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else { | ||
return false; | ||
}; | ||
|
@@ -1000,134 +995,220 @@ impl CanSplitDoBetter { | |
} | ||
} | ||
|
||
/// Optimize the order in which splits will get processed based on how it can skip the most | ||
/// splits. | ||
fn is_split_contained_in_search_time_range( | ||
split: &SplitIdAndFooterOffsets, | ||
search_request: &SearchRequest, | ||
) -> bool { | ||
if let Some(start) = search_request.start_timestamp { | ||
let Some(split_start) = split.timestamp_start else { | ||
return false; | ||
}; | ||
if split_start < start { | ||
return false; | ||
} | ||
} | ||
if let Some(end) = search_request.end_timestamp { | ||
let Some(split_end) = split.timestamp_end else { | ||
return false; | ||
}; | ||
if split_end >= end { | ||
return false; | ||
} | ||
} | ||
true | ||
} | ||
|
||
fn to_splits_with_request( | ||
splits: Vec<SplitIdAndFooterOffsets>, | ||
request: Arc<SearchRequest>, | ||
) -> Vec<(SplitIdAndFooterOffsets, SearchRequest)> { | ||
// TODO: we maybe want here some deduplication + Cow logic | ||
splits | ||
.into_iter() | ||
.map(|split| (split, (*request).clone())) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
/// Calculate the number of splits which are guaranteed to deliver enough documents. | ||
/// | ||
/// The leaf search code contains some logic that makes it possible to skip entire splits | ||
/// when we are confident they won't make it into top K. | ||
/// To make this optimization as potent as possible, we sort the splits so that the first splits | ||
/// are the most likely to fill our Top K. | ||
/// In the future, as split get more metadata per column, we may be able to do this more than | ||
/// just for timestamp and "unsorted" request. | ||
fn optimize_split_order(&self, splits: &mut [SplitIdAndFooterOffsets]) { | ||
match self { | ||
CanSplitDoBetter::SplitIdHigher(_) => { | ||
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id)) | ||
/// If there's a time range and not enough splits contain at least the number of requested | ||
/// documents, return None. | ||
fn get_min_required_splits( | ||
splits: &[SplitIdAndFooterOffsets], | ||
request: &SearchRequest, | ||
) -> Option<usize> { | ||
Comment on lines
+1036
to
+1039
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice, this is now really straightforward to understand and verify 😄 |
||
let num_requested_docs = request.start_offset + request.max_hits; | ||
|
||
let mut min_required_splits = 0; | ||
let mut partial_sum = 0; | ||
|
||
for split in splits.iter() { | ||
if !Self::is_split_contained_in_search_time_range(split, request) { | ||
continue; | ||
} | ||
CanSplitDoBetter::SplitTimestampHigher(_) | ||
| CanSplitDoBetter::FindTraceIdsAggregation(_) => { | ||
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end())) | ||
|
||
partial_sum += split.num_docs; | ||
|
||
min_required_splits += 1; | ||
if partial_sum >= num_requested_docs { | ||
return Some(min_required_splits); | ||
} | ||
CanSplitDoBetter::SplitTimestampLower(_) => { | ||
splits.sort_unstable_by_key(|split| split.timestamp_start()) | ||
} | ||
|
||
None | ||
} | ||
|
||
fn optimize_split_id_higher( | ||
request: Arc<SearchRequest>, | ||
mut splits: Vec<SplitIdAndFooterOffsets>, | ||
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> { | ||
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id)); | ||
|
||
if !is_simple_all_query(&request) { | ||
// no optimization opportunity here. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
} | ||
|
||
let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { | ||
// not enough splits contained in time range. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
}; | ||
|
||
let mut split_with_req = Self::to_splits_with_request(splits, request); | ||
|
||
// In this case there is no sort order, we order by split id. | ||
// If the the first split has enough documents, we can convert the other queries to | ||
// count only queries. | ||
for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
disable_search_request_hits(request); | ||
} | ||
|
||
Ok(split_with_req) | ||
} | ||
|
||
fn optimize_split_timestamp_higher( | ||
request: Arc<SearchRequest>, | ||
mut splits: Vec<SplitIdAndFooterOffsets>, | ||
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> { | ||
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end())); | ||
|
||
if !is_simple_all_query(&request) { | ||
// no optimization opportunity here. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
} | ||
|
||
let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { | ||
// not enough splits contained in time range. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
}; | ||
|
||
let mut split_with_req = Self::to_splits_with_request(splits, request); | ||
|
||
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc. | ||
// | ||
// We have the number of splits we need to search to get enough docs, now we need to | ||
// find the splits that don't overlap. | ||
// | ||
// Let's get the smallest timestamp_start of the first num_splits splits | ||
let smallest_start_timestamp = split_with_req | ||
.iter() | ||
.take(min_required_splits) | ||
.map(|(split, _)| split.timestamp_start()) | ||
.min() | ||
// if min_required_splits is 0, we choose a value that disables all splits | ||
.unwrap_or(i64::MAX); | ||
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
if split.timestamp_end() < smallest_start_timestamp { | ||
disable_search_request_hits(request); | ||
} | ||
CanSplitDoBetter::Uninformative => (), | ||
} | ||
|
||
Ok(split_with_req) | ||
} | ||
|
||
/// This function tries to detect upfront which splits contain the top n hits and convert other | ||
/// split searches to count only searches. It also optimizes split order. | ||
/// | ||
/// Returns the search_requests with their split. | ||
fn optimize( | ||
&self, | ||
fn optimize_split_timestamp_lower( | ||
request: Arc<SearchRequest>, | ||
mut splits: Vec<SplitIdAndFooterOffsets>, | ||
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> { | ||
self.optimize_split_order(&mut splits); | ||
splits.sort_unstable_by_key(|split| split.timestamp_start()); | ||
|
||
if !is_simple_all_query(&request) { | ||
// no optimization opportunity here. | ||
return Ok(splits | ||
.into_iter() | ||
.map(|split| (split, (*request).clone())) | ||
.collect::<Vec<_>>()); | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
} | ||
|
||
let num_requested_docs = request.start_offset + request.max_hits; | ||
let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { | ||
// not enough splits contained in time range. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
}; | ||
|
||
// Calculate the number of splits which are guaranteed to deliver enough documents. | ||
let min_required_splits = splits | ||
let mut split_with_req = Self::to_splits_with_request(splits, request); | ||
|
||
// We order by timestamp asc. split_with_req is sorted by timestamp_start. | ||
// | ||
// If we know that some splits will deliver enough documents, we can convert the | ||
// others to count only queries. | ||
// Since we only have start and end ranges and don't know the distribution we make | ||
// sure the splits dont' overlap, since the distribution of two | ||
// splits could be like this (dot is a timestamp doc on a x axis), for top 2 | ||
// queries. | ||
// ``` | ||
// [. .] Split1 has enough docs, but last doc is not in top 2 | ||
// [.. .] Split2 first doc is in top2 | ||
// ``` | ||
// Let's get the biggest timestamp_end of the first num_splits splits | ||
let biggest_end_timestamp = split_with_req | ||
.iter() | ||
.map(|split| split.num_docs) | ||
// computing the partial sum | ||
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| { | ||
*partial_sum += num_docs_in_split; | ||
Some(*partial_sum) | ||
}) | ||
.take_while(|partial_sum| *partial_sum < num_requested_docs) | ||
.count() | ||
+ 1; | ||
.take(min_required_splits) | ||
.map(|(split, _)| split.timestamp_end()) | ||
.max() | ||
// if min_required_splits is 0, we choose a value that disables all splits | ||
.unwrap_or(i64::MIN); | ||
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
if split.timestamp_start() > biggest_end_timestamp { | ||
disable_search_request_hits(request); | ||
} | ||
} | ||
|
||
// TODO: we maybe want here some deduplication + Cow logic | ||
let mut split_with_req = splits | ||
.into_iter() | ||
.map(|split| (split, (*request).clone())) | ||
.collect::<Vec<_>>(); | ||
Ok(split_with_req) | ||
} | ||
|
||
fn optimize_find_trace_ids_aggregation( | ||
request: Arc<SearchRequest>, | ||
mut splits: Vec<SplitIdAndFooterOffsets>, | ||
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> { | ||
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end())); | ||
|
||
if !is_simple_all_query(&request) { | ||
// no optimization opportunity here. | ||
return Ok(Self::to_splits_with_request(splits, request)); | ||
} | ||
|
||
// reuse the detected sort order in split_filter | ||
// we want to detect cases where we can convert some split queries to count only queries | ||
Ok(Self::to_splits_with_request(splits, request)) | ||
} | ||
|
||
/// This function tries to detect upfront which splits contain the top n hits and convert other | ||
/// split searches to count only searches. It also optimizes split order. | ||
/// | ||
/// Returns the search_requests with their split. | ||
fn optimize( | ||
&self, | ||
request: Arc<SearchRequest>, | ||
splits: Vec<SplitIdAndFooterOffsets>, | ||
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> { | ||
match self { | ||
CanSplitDoBetter::SplitIdHigher(_) => { | ||
// In this case there is no sort order, we order by split id. | ||
// If the the first split has enough documents, we can convert the other queries to | ||
// count only queries | ||
for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
disable_search_request_hits(request); | ||
} | ||
CanSplitDoBetter::SplitIdHigher(_) => Self::optimize_split_id_higher(request, splits), | ||
CanSplitDoBetter::SplitTimestampHigher(_) => { | ||
Self::optimize_split_timestamp_higher(request, splits) | ||
} | ||
CanSplitDoBetter::Uninformative => {} | ||
CanSplitDoBetter::SplitTimestampLower(_) => { | ||
// We order by timestamp asc. split_with_req is sorted by timestamp_start. | ||
// | ||
// If we know that some splits will deliver enough documents, we can convert the | ||
// others to count only queries. | ||
// Since we only have start and end ranges and don't know the distribution we make | ||
// sure the splits dont' overlap, since the distribution of two | ||
// splits could be like this (dot is a timestamp doc on a x axis), for top 2 | ||
// queries. | ||
// ``` | ||
// [. .] Split1 has enough docs, but last doc is not in top 2 | ||
// [.. .] Split2 first doc is in top2 | ||
// ``` | ||
// Let's get the biggest timestamp_end of the first num_splits splits | ||
let biggest_end_timestamp = split_with_req | ||
.iter() | ||
.take(min_required_splits) | ||
.map(|(split, _)| split.timestamp_end()) | ||
.max() | ||
// if min_required_splits is 0, we choose a value that disables all splits | ||
.unwrap_or(i64::MIN); | ||
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
if split.timestamp_start() > biggest_end_timestamp { | ||
disable_search_request_hits(request); | ||
} | ||
} | ||
Self::optimize_split_timestamp_lower(request, splits) | ||
} | ||
CanSplitDoBetter::SplitTimestampHigher(_) => { | ||
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc. | ||
// | ||
// We have the number of splits we need to search to get enough docs, now we need to | ||
// find the splits that don't overlap. | ||
// | ||
// Let's get the smallest timestamp_start of the first num_splits splits | ||
let smallest_start_timestamp = split_with_req | ||
.iter() | ||
.take(min_required_splits) | ||
.map(|(split, _)| split.timestamp_start()) | ||
.min() | ||
// if min_required_splits is 0, we choose a value that disables all splits | ||
.unwrap_or(i64::MAX); | ||
for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { | ||
if split.timestamp_end() < smallest_start_timestamp { | ||
disable_search_request_hits(request); | ||
} | ||
} | ||
CanSplitDoBetter::FindTraceIdsAggregation(_) => { | ||
Self::optimize_find_trace_ids_aggregation(request, splits) | ||
} | ||
CanSplitDoBetter::FindTraceIdsAggregation(_) => {} | ||
CanSplitDoBetter::Uninformative => Ok(Self::to_splits_with_request(splits, request)), | ||
} | ||
|
||
Ok(split_with_req) | ||
} | ||
|
||
/// Returns whether the given split can possibly give documents better than the one already | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the comment: