Skip to content

Refactor try_execute_query #109100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions compiler/rustc_query_system/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ impl<D: DepKind> QueryJob<D> {
}

impl QueryJobId {
#[cold]
#[inline(never)]
#[cfg(not(parallel_compiler))]
pub(super) fn find_cycle_in_stack<D: DepKind>(
&self,
Expand Down
266 changes: 134 additions & 132 deletions compiler/rustc_query_system/src/query/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use crate::dep_graph::{DepContext, DepKind, DepNode, DepNodeIndex, DepNodeParams
use crate::dep_graph::{DepGraphData, HasDepContext};
use crate::ich::StableHashingContext;
use crate::query::caches::QueryCache;
#[cfg(parallel_compiler)]
use crate::query::job::QueryLatch;
use crate::query::job::{report_cycle, QueryInfo, QueryJob, QueryJobId, QueryJobInfo};
use crate::query::{QueryContext, QueryMap, QuerySideEffects, QueryStackFrame};
use crate::values::Value;
use crate::HandleCycleError;
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::FxHashMap;
#[cfg(parallel_compiler)]
use rustc_data_structures::profiling::TimingGuard;
#[cfg(parallel_compiler)]
use rustc_data_structures::sharded::Sharded;
use rustc_data_structures::stack::ensure_sufficient_stack;
use rustc_data_structures::sync::{Lock, LockGuard};
use rustc_data_structures::sync::Lock;
#[cfg(parallel_compiler)]
use rustc_data_structures::{cold_path, sharded::Sharded};
use rustc_errors::{DiagnosticBuilder, ErrorGuaranteed, FatalError};
use rustc_session::Session;
use rustc_span::{Span, DUMMY_SP};
Expand Down Expand Up @@ -116,7 +116,6 @@ where
{
state: &'tcx QueryState<K, D>,
key: K,
id: QueryJobId,
}

#[cold]
Expand Down Expand Up @@ -166,81 +165,6 @@ impl<'tcx, K, D: DepKind> JobOwner<'tcx, K, D>
where
K: Eq + Hash + Copy,
{
/// Either gets a `JobOwner` corresponding the query, allowing us to
/// start executing the query, or returns with the result of the query.
/// This function assumes that `try_get_cached` is already called and returned `lookup`.
/// If the query is executing elsewhere, this will wait for it and return the result.
/// If the query panicked, this will silently panic.
///
/// This function is inlined because that results in a noticeable speed-up
/// for some compile-time benchmarks.
#[inline(always)]
fn try_start<'b, Qcx>(
qcx: &'b Qcx,
state: &'b QueryState<K, Qcx::DepKind>,
mut state_lock: LockGuard<'b, FxHashMap<K, QueryResult<Qcx::DepKind>>>,
span: Span,
key: K,
) -> TryGetJob<'b, K, D>
where
Qcx: QueryContext + HasDepContext<DepKind = D>,
{
let lock = &mut *state_lock;
let current_job_id = qcx.current_query_job();

match lock.entry(key) {
Entry::Vacant(entry) => {
let id = qcx.next_job_id();
let job = QueryJob::new(id, span, current_job_id);

let key = *entry.key();
entry.insert(QueryResult::Started(job));

let owner = JobOwner { state, id, key };
return TryGetJob::NotYetStarted(owner);
}
Entry::Occupied(mut entry) => {
match entry.get_mut() {
#[cfg(not(parallel_compiler))]
QueryResult::Started(job) => {
let id = job.id;
drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
return TryGetJob::Cycle(id.find_cycle_in_stack(
qcx.try_collect_active_jobs().unwrap(),
&current_job_id,
span,
));
}
#[cfg(parallel_compiler)]
QueryResult::Started(job) => {
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked();

// Get the latch out
let latch = job.latch();

drop(state_lock);

// With parallel queries we might just have to wait on some other
// thread.
let result = latch.wait_on(current_job_id, span);

match result {
Ok(()) => TryGetJob::JobCompleted(query_blocked_prof_timer),
Err(cycle) => TryGetJob::Cycle(cycle),
}
}
QueryResult::Poisoned => FatalError.raise(),
}
}
}
}

/// Completes the query by updating the query cache with the `result`,
/// signals the waiter and forgets the JobOwner, so it won't poison the query
fn complete<C>(self, cache: &C, result: C::Value, dep_node_index: DepNodeIndex)
Expand Down Expand Up @@ -307,25 +231,6 @@ pub(crate) struct CycleError<D: DepKind> {
pub cycle: Vec<QueryInfo<D>>,
}

/// The result of `try_start`.
enum TryGetJob<'tcx, K, D>
where
K: Eq + Hash + Copy,
D: DepKind,
{
/// The query is not yet started. Contains a guard to the cache eventually used to start it.
NotYetStarted(JobOwner<'tcx, K, D>),

/// The query was already completed.
/// Returns the result of the query and its dep-node index
/// if it succeeded or a cycle error if it failed.
#[cfg(parallel_compiler)]
JobCompleted(TimingGuard<'tcx>),

/// Trying to execute the query resulted in a cycle.
Cycle(CycleError<D>),
}

/// Checks if the query is already computed and in the cache.
/// It returns the shard index and a lock guard to the shard,
/// which will be used if the query is not in the cache and we need
Expand All @@ -346,6 +251,65 @@ where
}
}

#[cold]
#[inline(never)]
#[cfg(not(parallel_compiler))]
fn cycle_error<Q, Qcx>(
query: Q,
qcx: Qcx,
try_execute: QueryJobId,
span: Span,
) -> (Q::Value, Option<DepNodeIndex>)
where
Q: QueryConfig<Qcx>,
Qcx: QueryContext,
{
let error = try_execute.find_cycle_in_stack(
qcx.try_collect_active_jobs().unwrap(),
&qcx.current_query_job(),
span,
);
(mk_cycle(qcx, error, query.handle_cycle_error()), None)
}

#[inline(always)]
#[cfg(parallel_compiler)]
fn wait_for_query<Q, Qcx>(
query: Q,
qcx: Qcx,
span: Span,
key: Q::Key,
latch: QueryLatch<Qcx::DepKind>,
current: Option<QueryJobId>,
) -> (Q::Value, Option<DepNodeIndex>)
where
Q: QueryConfig<Qcx>,
Qcx: QueryContext,
{
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let query_blocked_prof_timer = qcx.dep_context().profiler().query_blocked();

// With parallel queries we might just have to wait on some other
// thread.
let result = latch.wait_on(current, span);

match result {
Ok(()) => {
let Some((v, index)) = query.query_cache(qcx).lookup(&key) else {
cold_path(|| panic!("value must be in cache after waiting"))
};

qcx.dep_context().profiler().query_cache_hit(index.into());
query_blocked_prof_timer.finish_with_query_invocation_id(index.into());

(v, Some(index))
}
Err(cycle) => (mk_cycle(qcx, cycle, query.handle_cycle_error()), None),
}
}

#[inline(never)]
fn try_execute_query<Q, Qcx>(
query: Q,
Expand All @@ -360,9 +324,9 @@ where
{
let state = query.query_state(qcx);
#[cfg(parallel_compiler)]
let state_lock = state.active.get_shard_by_value(&key).lock();
let mut state_lock = state.active.get_shard_by_value(&key).lock();
#[cfg(not(parallel_compiler))]
let state_lock = state.active.lock();
let mut state_lock = state.active.lock();

// For the parallel compiler we need to check both the query cache and query state structures
// while holding the state lock to ensure that 1) the query has not yet completed and 2) the
Expand All @@ -377,44 +341,82 @@ where
}
}

match JobOwner::<'_, Q::Key, Qcx::DepKind>::try_start(&qcx, state, state_lock, span, key) {
TryGetJob::NotYetStarted(job) => {
let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() {
None => execute_job_non_incr(query, qcx, key, job.id),
Some(data) => execute_job_incr(query, qcx, data, key, dep_node, job.id),
};
let current_job_id = qcx.current_query_job();

match state_lock.entry(key) {
Entry::Vacant(entry) => {
// Nothing has computed or is computing the query, so we start a new job and insert it in the
// state map.
let id = qcx.next_job_id();
let job = QueryJob::new(id, span, current_job_id);
entry.insert(QueryResult::Started(job));

// Drop the lock before we start executing the query
drop(state_lock);

execute_job(query, qcx, state, key, id, dep_node)
}
Entry::Occupied(mut entry) => {
match entry.get_mut() {
#[cfg(not(parallel_compiler))]
QueryResult::Started(job) => {
let id = job.id;
drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
cycle_error(query, qcx, id, span)
}
#[cfg(parallel_compiler)]
QueryResult::Started(job) => {
// Get the latch out
let latch = job.latch();
drop(state_lock);

let cache = query.query_cache(qcx);
if query.feedable() {
// We should not compute queries that also got a value via feeding.
// This can't happen, as query feeding adds the very dependencies to the fed query
// as its feeding query had. So if the fed query is red, so is its feeder, which will
// get evaluated first, and re-feed the query.
if let Some((cached_result, _)) = cache.lookup(&key) {
panic!(
"fed query later has its value computed. The already cached value: {cached_result:?}"
);
wait_for_query(query, qcx, span, key, latch, current_job_id)
}
QueryResult::Poisoned => FatalError.raise(),
}
job.complete(cache, result, dep_node_index);
(result, Some(dep_node_index))
}
TryGetJob::Cycle(error) => {
let result = mk_cycle(qcx, error, query.handle_cycle_error());
(result, None)
}
#[cfg(parallel_compiler)]
TryGetJob::JobCompleted(query_blocked_prof_timer) => {
let Some((v, index)) = query.query_cache(qcx).lookup(&key) else {
panic!("value must be in cache after waiting")
};
}
}

qcx.dep_context().profiler().query_cache_hit(index.into());
query_blocked_prof_timer.finish_with_query_invocation_id(index.into());
#[inline(always)]
fn execute_job<Q, Qcx>(
query: Q,
qcx: Qcx,
state: &QueryState<Q::Key, Qcx::DepKind>,
key: Q::Key,
id: QueryJobId,
dep_node: Option<DepNode<Qcx::DepKind>>,
) -> (Q::Value, Option<DepNodeIndex>)
where
Q: QueryConfig<Qcx>,
Qcx: QueryContext,
{
// Use `JobOwner` so the query will be poisoned if executing it panics.
let job_owner = JobOwner { state, key };

(v, Some(index))
let (result, dep_node_index) = match qcx.dep_context().dep_graph().data() {
None => execute_job_non_incr(query, qcx, key, id),
Some(data) => execute_job_incr(query, qcx, data, key, dep_node, id),
};

let cache = query.query_cache(qcx);
if query.feedable() {
// We should not compute queries that also got a value via feeding.
// This can't happen, as query feeding adds the very dependencies to the fed query
// as its feeding query had. So if the fed query is red, so is its feeder, which will
// get evaluated first, and re-feed the query.
if let Some((cached_result, _)) = cache.lookup(&key) {
panic!(
"fed query later has its value computed. The already cached value: {cached_result:?}"
);
}
}
job_owner.complete(cache, result, dep_node_index);

(result, Some(dep_node_index))
}

// Fast path for when incr. comp. is off.
Expand Down