Skip to content

Streamline try_start code #84806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compiler/rustc_query_system/src/query/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub trait CacheSelector<K, V> {
type Cache;
}

pub trait QueryStorage: Default {
pub trait QueryStorage {
type Value: Debug;
type Stored: Clone;

Expand All @@ -23,7 +23,7 @@ pub trait QueryStorage: Default {
fn store_nocache(&self, value: Self::Value) -> Self::Stored;
}

pub trait QueryCache: QueryStorage {
pub trait QueryCache: QueryStorage + Sized {
type Key: Hash + Eq + Clone + Debug;
type Sharded: Default;

Expand Down
4 changes: 0 additions & 4 deletions compiler/rustc_query_system/src/query/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ impl<CTX: QueryContext, K, V> QueryVtable<CTX, K, V> {
(self.hash_result)(hcx, value)
}

pub(crate) fn handle_cycle_error(&self, tcx: CTX, diag: DiagnosticBuilder<'_>) -> V {
(self.handle_cycle_error)(tcx, diag)
}

pub(crate) fn cache_on_disk(&self, tcx: CTX, key: &K, value: Option<&V>) -> bool {
(self.cache_on_disk)(tcx, key, value)
}
Expand Down
21 changes: 3 additions & 18 deletions compiler/rustc_query_system/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use rustc_span::Span;

use std::convert::TryFrom;
use std::hash::Hash;
use std::marker::PhantomData;
use std::num::NonZeroU32;

#[cfg(parallel_compiler)]
Expand Down Expand Up @@ -100,8 +99,6 @@ pub struct QueryJob<D> {
/// The latch that is used to wait on this job.
#[cfg(parallel_compiler)]
latch: Option<QueryLatch<D>>,

dummy: PhantomData<QueryLatch<D>>,
}

impl<D> QueryJob<D>
Expand All @@ -116,23 +113,17 @@ where
parent,
#[cfg(parallel_compiler)]
latch: None,
dummy: PhantomData,
}
}

#[cfg(parallel_compiler)]
pub(super) fn latch(&mut self, _id: QueryJobId<D>) -> QueryLatch<D> {
pub(super) fn latch(&mut self) -> QueryLatch<D> {
if self.latch.is_none() {
self.latch = Some(QueryLatch::new());
}
self.latch.as_ref().unwrap().clone()
}

#[cfg(not(parallel_compiler))]
pub(super) fn latch(&mut self, id: QueryJobId<D>) -> QueryLatch<D> {
QueryLatch { id }
}

/// Signals to waiters that the query is complete.
///
/// This does nothing for single threaded rustc,
Expand All @@ -148,13 +139,7 @@ where
}

#[cfg(not(parallel_compiler))]
#[derive(Clone)]
pub(super) struct QueryLatch<D> {
id: QueryJobId<D>,
}

#[cfg(not(parallel_compiler))]
impl<D> QueryLatch<D>
impl<D> QueryJobId<D>
where
D: Copy + Clone + Eq + Hash,
{
Expand All @@ -172,7 +157,7 @@ where
let info = query_map.get(&job).unwrap();
cycle.push(info.info.clone());

if job == self.id {
if job == *self {
cycle.reverse();

// This is the end of the cycle
Expand Down
183 changes: 106 additions & 77 deletions compiler/rustc_query_system/src/query/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::query::job::{
};
use crate::query::{QueryContext, QueryMap, QueryStackFrame};

#[cfg(not(parallel_compiler))]
use rustc_data_structures::cold_path;
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::{FxHashMap, FxHasher};
use rustc_data_structures::sharded::{get_shard_index_by_hash, Sharded};
use rustc_data_structures::sync::{Lock, LockGuard};
use rustc_data_structures::thin_vec::ThinVec;
#[cfg(not(parallel_compiler))]
use rustc_errors::DiagnosticBuilder;
use rustc_errors::{Diagnostic, FatalError};
use rustc_span::Span;
use std::collections::hash_map::Entry;
Expand All @@ -36,7 +36,7 @@ pub struct QueryCacheStore<C: QueryCache> {
pub cache_hits: AtomicUsize,
}

impl<C: QueryCache> Default for QueryCacheStore<C> {
impl<C: QueryCache + Default> Default for QueryCacheStore<C> {
fn default() -> Self {
Self {
cache: C::default(),
Expand Down Expand Up @@ -161,6 +161,31 @@ where
id: QueryJobId<D>,
}

#[cold]
#[inline(never)]
#[cfg(not(parallel_compiler))]
fn mk_cycle<CTX, V, R>(
tcx: CTX,
root: QueryJobId<CTX::DepKind>,
span: Span,
handle_cycle_error: fn(CTX, DiagnosticBuilder<'_>) -> V,
cache: &dyn crate::query::QueryStorage<Value = V, Stored = R>,
) -> R
where
CTX: QueryContext,
V: std::fmt::Debug,
R: Clone,
{
let error: CycleError = root.find_cycle_in_stack(
tcx.try_collect_active_jobs().unwrap(),
&tcx.current_query_job(),
span,
);
let error = report_cycle(tcx.dep_context().sess(), error);
let value = handle_cycle_error(tcx, error);
cache.store_nocache(value)
}

impl<'tcx, D, C> JobOwner<'tcx, D, C>
where
D: Copy + Clone + Eq + Hash,
Expand All @@ -180,7 +205,7 @@ where
state: &'b QueryState<CTX::DepKind, C::Key>,
cache: &'b QueryCacheStore<C>,
span: Span,
key: &C::Key,
key: C::Key,
lookup: QueryLookup,
query: &QueryVtable<CTX, C::Key, C::Value>,
) -> TryGetJob<'b, CTX::DepKind, C>
Expand All @@ -191,94 +216,86 @@ where
let mut state_lock = state.shards.get_shard_by_index(shard).lock();
let lock = &mut *state_lock;

let (latch, mut _query_blocked_prof_timer) = match lock.active.entry((*key).clone()) {
Entry::Occupied(mut entry) => {
match entry.get_mut() {
QueryResult::Started(job) => {
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let _query_blocked_prof_timer = if cfg!(parallel_compiler) {
Some(tcx.dep_context().profiler().query_blocked())
} else {
None
};

// Create the id of the job we're waiting for
let id = QueryJobId::new(job.id, shard, query.dep_kind);

(job.latch(id), _query_blocked_prof_timer)
}
QueryResult::Poisoned => FatalError.raise(),
}
}
match lock.active.entry(key) {
Entry::Vacant(entry) => {
// No job entry for this query. Return a new one to be started later.

// Generate an id unique within this shard.
let id = lock.jobs.checked_add(1).unwrap();
lock.jobs = id;
let id = QueryShardJobId(NonZeroU32::new(id).unwrap());

let global_id = QueryJobId::new(id, shard, query.dep_kind);

let job = tcx.current_query_job();
let job = QueryJob::new(id, span, job);

let key = entry.key().clone();
entry.insert(QueryResult::Started(job));

let owner = JobOwner { state, cache, id: global_id, key: (*key).clone() };
let global_id = QueryJobId::new(id, shard, query.dep_kind);
let owner = JobOwner { state, cache, id: global_id, key };
return TryGetJob::NotYetStarted(owner);
}
};
mem::drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
#[cfg(not(parallel_compiler))]
return TryGetJob::Cycle(cold_path(|| {
let error: CycleError = latch.find_cycle_in_stack(
tcx.try_collect_active_jobs().unwrap(),
&tcx.current_query_job(),
span,
);
let error = report_cycle(tcx.dep_context().sess(), error);
let value = query.handle_cycle_error(tcx, error);
cache.cache.store_nocache(value)
}));

// With parallel queries we might just have to wait on some other
// thread.
#[cfg(parallel_compiler)]
{
let result = latch.wait_on(tcx.current_query_job(), span);

if let Err(cycle) = result {
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
let value = query.handle_cycle_error(tcx, cycle);
let value = cache.cache.store_nocache(value);
return TryGetJob::Cycle(value);
}
Entry::Occupied(mut entry) => {
match entry.get_mut() {
#[cfg(not(parallel_compiler))]
QueryResult::Started(job) => {
let id = QueryJobId::new(job.id, shard, query.dep_kind);

let cached = cache
.cache
.lookup(cache, &key, |value, index| {
if unlikely!(tcx.dep_context().profiler().enabled()) {
tcx.dep_context().profiler().query_cache_hit(index.into());
drop(state_lock);

// If we are single-threaded we know that we have cycle error,
// so we just return the error.
return TryGetJob::Cycle(mk_cycle(
tcx,
id,
span,
query.handle_cycle_error,
&cache.cache,
));
}
#[cfg(debug_assertions)]
{
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
#[cfg(parallel_compiler)]
QueryResult::Started(job) => {
// For parallel queries, we'll block and wait until the query running
// in another thread has completed. Record how long we wait in the
// self-profiler.
let query_blocked_prof_timer = tcx.dep_context().profiler().query_blocked();

// Get the latch out
let latch = job.latch();
let key = entry.key().clone();

drop(state_lock);

// With parallel queries we might just have to wait on some other
// thread.
let result = latch.wait_on(tcx.current_query_job(), span);

if let Err(cycle) = result {
let cycle = report_cycle(tcx.dep_context().sess(), cycle);
let value = (query.handle_cycle_error)(tcx, cycle);
let value = cache.cache.store_nocache(value);
return TryGetJob::Cycle(value);
}

let cached = cache
.cache
.lookup(cache, &key, |value, index| {
if unlikely!(tcx.dep_context().profiler().enabled()) {
tcx.dep_context().profiler().query_cache_hit(index.into());
}
#[cfg(debug_assertions)]
{
cache.cache_hits.fetch_add(1, Ordering::Relaxed);
}
(value.clone(), index)
})
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));

query_blocked_prof_timer.finish_with_query_invocation_id(cached.1.into());

return TryGetJob::JobCompleted(cached);
}
(value.clone(), index)
})
.unwrap_or_else(|_| panic!("value must be in cache after waiting"));

if let Some(prof_timer) = _query_blocked_prof_timer.take() {
prof_timer.finish_with_query_invocation_id(cached.1.into());
QueryResult::Poisoned => FatalError.raise(),
}
}

return TryGetJob::JobCompleted(cached);
}
}

Expand Down Expand Up @@ -421,7 +438,13 @@ where
CTX: QueryContext,
{
let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
tcx, state, cache, span, &key, lookup, query,
tcx,
state,
cache,
span,
key.clone(),
lookup,
query,
) {
TryGetJob::NotYetStarted(job) => job,
TryGetJob::Cycle(result) => return result,
Expand Down Expand Up @@ -744,7 +767,13 @@ fn force_query_impl<CTX, C>(
};

let job = match JobOwner::<'_, CTX::DepKind, C>::try_start(
tcx, state, cache, span, &key, lookup, query,
tcx,
state,
cache,
span,
key.clone(),
lookup,
query,
) {
TryGetJob::NotYetStarted(job) => job,
TryGetJob::Cycle(_) => return,
Expand Down