Skip to content

Commit d801092

Browse files
d-nettonickrobinson251
authored andcommitted
NFC: create an actual set of functions to manipulate GC thread ids (JuliaLang#54984)
Also adds a bunch of integrity constraint checks to ensure we don't repeat the bug from JuliaLang#54645.
1 parent e434263 commit d801092

File tree

4 files changed

+95
-24
lines changed

4 files changed

+95
-24
lines changed

src/gc.c

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,9 +1639,11 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_
16391639
if (parallel_sweep_worthwhile && !page_profile_enabled) {
16401640
jl_atomic_store(&gc_allocd_scratch, new_gc_allocd_scratch);
16411641
uv_mutex_lock(&gc_threads_lock);
1642-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
1642+
int first = gc_first_parallel_collector_thread_id();
1643+
int last = gc_last_parallel_collector_thread_id();
1644+
for (int i = first; i <= last; i++) {
16431645
jl_ptls_t ptls2 = gc_all_tls_states[i];
1644-
assert(ptls2 != NULL); // should be a GC thread
1646+
gc_check_ptls_of_parallel_collector_thread(ptls2);
16451647
jl_atomic_fetch_add(&ptls2->gc_sweeps_requested, 1);
16461648
}
16471649
uv_cond_broadcast(&gc_threads_cond);
@@ -1653,9 +1655,11 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_
16531655
// collecting a page profile.
16541656
// wait for all to leave in order to ensure that a straggler doesn't
16551657
// try to enter sweeping after we set `gc_allocd_scratch` below.
1656-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
1658+
int first = gc_first_parallel_collector_thread_id();
1659+
int last = gc_last_parallel_collector_thread_id();
1660+
for (int i = first; i <= last; i++) {
16571661
jl_ptls_t ptls2 = gc_all_tls_states[i];
1658-
assert(ptls2 != NULL); // should be a GC thread
1662+
gc_check_ptls_of_parallel_collector_thread(ptls2);
16591663
while (jl_atomic_load_acquire(&ptls2->gc_sweeps_requested) != 0) {
16601664
jl_cpu_pause();
16611665
}
@@ -3006,19 +3010,25 @@ void gc_mark_and_steal(jl_ptls_t ptls)
30063010
// since we know chunks will likely expand into a lot
30073011
// of work for the mark loop
30083012
steal : {
3013+
int first = gc_first_parallel_collector_thread_id();
3014+
int last = gc_last_parallel_collector_thread_id();
30093015
// Try to steal chunk from random GC thread
30103016
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
3011-
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
3012-
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
3017+
int v = gc_random_parallel_collector_thread_id(ptls);
3018+
jl_ptls_t ptls2 = gc_all_tls_states[v];
3019+
gc_check_ptls_of_parallel_collector_thread(ptls2);
3020+
jl_gc_markqueue_t *mq2 = &ptls2->mark_queue;
30133021
c = gc_chunkqueue_steal_from(mq2);
30143022
if (c.cid != GC_empty_chunk) {
30153023
gc_mark_chunk(ptls, mq, &c);
30163024
goto pop;
30173025
}
30183026
}
30193027
// Sequentially walk GC threads to try to steal chunk
3020-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
3021-
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
3028+
for (int i = first; i <= last; i++) {
3029+
jl_ptls_t ptls2 = gc_all_tls_states[i];
3030+
gc_check_ptls_of_parallel_collector_thread(ptls2);
3031+
jl_gc_markqueue_t *mq2 = &ptls2->mark_queue;
30223032
c = gc_chunkqueue_steal_from(mq2);
30233033
if (c.cid != GC_empty_chunk) {
30243034
gc_mark_chunk(ptls, mq, &c);
@@ -3035,15 +3045,19 @@ void gc_mark_and_steal(jl_ptls_t ptls)
30353045
}
30363046
// Try to steal pointer from random GC thread
30373047
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
3038-
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
3039-
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
3048+
int v = gc_random_parallel_collector_thread_id(ptls);
3049+
jl_ptls_t ptls2 = gc_all_tls_states[v];
3050+
gc_check_ptls_of_parallel_collector_thread(ptls2);
3051+
jl_gc_markqueue_t *mq2 = &ptls2->mark_queue;
30403052
new_obj = gc_ptr_queue_steal_from(mq2);
30413053
if (new_obj != NULL)
30423054
goto mark;
30433055
}
30443056
// Sequentially walk GC threads to try to steal pointer
3045-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
3046-
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
3057+
for (int i = first; i <= last; i++) {
3058+
jl_ptls_t ptls2 = gc_all_tls_states[i];
3059+
gc_check_ptls_of_parallel_collector_thread(ptls2);
3060+
jl_gc_markqueue_t *mq2 = &ptls2->mark_queue;
30473061
new_obj = gc_ptr_queue_steal_from(mq2);
30483062
if (new_obj != NULL)
30493063
goto mark;
@@ -3103,12 +3117,13 @@ int gc_should_mark(void)
31033117
}
31043118
int tid = jl_atomic_load_relaxed(&gc_master_tid);
31053119
assert(tid != -1);
3120+
assert(gc_all_tls_states != NULL);
31063121
size_t work = gc_count_work_in_queue(gc_all_tls_states[tid]);
3107-
for (tid = gc_first_tid; tid < gc_first_tid + jl_n_markthreads; tid++) {
3108-
jl_ptls_t ptls2 = gc_all_tls_states[tid];
3109-
if (ptls2 == NULL) {
3110-
continue;
3111-
}
3122+
int first = gc_first_parallel_collector_thread_id();
3123+
int last = gc_last_parallel_collector_thread_id();
3124+
for (int i = first; i <= last; i++) {
3125+
jl_ptls_t ptls2 = gc_all_tls_states[i];
3126+
gc_check_ptls_of_parallel_collector_thread(ptls2);
31123127
work += gc_count_work_in_queue(ptls2);
31133128
}
31143129
// if there is a lot of work left, enter the mark loop
@@ -3486,7 +3501,8 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
34863501
jl_ptls_t ptls_dest = ptls;
34873502
jl_gc_markqueue_t *mq_dest = mq;
34883503
if (!single_threaded_mark) {
3489-
ptls_dest = gc_all_tls_states[gc_first_tid + t_i % jl_n_markthreads];
3504+
int dest_tid = gc_ith_parallel_collector_thread_id(t_i % jl_n_markthreads);
3505+
ptls_dest = gc_all_tls_states[dest_tid];
34903506
mq_dest = &ptls_dest->mark_queue;
34913507
}
34923508
if (ptls2 != NULL) {

src/gc.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,54 @@ extern int gc_first_tid;
439439
extern int gc_n_threads;
440440
extern jl_ptls_t* gc_all_tls_states;
441441

442+
STATIC_INLINE int gc_first_parallel_collector_thread_id(void) JL_NOTSAFEPOINT
443+
{
444+
if (jl_n_markthreads == 0) {
445+
return 0;
446+
}
447+
return gc_first_tid;
448+
}
449+
450+
STATIC_INLINE int gc_last_parallel_collector_thread_id(void) JL_NOTSAFEPOINT
451+
{
452+
if (jl_n_markthreads == 0) {
453+
return -1;
454+
}
455+
return gc_first_tid + jl_n_markthreads - 1;
456+
}
457+
458+
STATIC_INLINE int gc_ith_parallel_collector_thread_id(int i) JL_NOTSAFEPOINT
459+
{
460+
assert(i >= 0 && i < jl_n_markthreads);
461+
return gc_first_tid + i;
462+
}
463+
464+
STATIC_INLINE int gc_is_parallel_collector_thread(int tid) JL_NOTSAFEPOINT
465+
{
466+
return tid >= gc_first_tid && tid <= gc_last_parallel_collector_thread_id();
467+
}
468+
469+
STATIC_INLINE int gc_random_parallel_collector_thread_id(jl_ptls_t ptls) JL_NOTSAFEPOINT
470+
{
471+
assert(jl_n_markthreads > 0);
472+
int v = gc_first_tid + (int)cong(jl_n_markthreads, UINT64_MAX, &ptls->rngseed);
473+
assert(v >= gc_first_tid && v <= gc_last_parallel_collector_thread_id());
474+
return v;
475+
}
476+
477+
STATIC_INLINE int gc_parallel_collector_threads_enabled(void) JL_NOTSAFEPOINT
478+
{
479+
return jl_n_markthreads > 0;
480+
}
481+
482+
STATIC_INLINE void gc_check_ptls_of_parallel_collector_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
483+
{
484+
(void)ptls;
485+
assert(gc_parallel_collector_threads_enabled());
486+
assert(ptls != NULL);
487+
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
488+
}
489+
442490
STATIC_INLINE bigval_t *bigval_header(jl_taggedvalue_t *o) JL_NOTSAFEPOINT
443491
{
444492
return container_of(o, bigval_t, header);

src/julia_threads.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ typedef struct _jl_tls_states_t {
215215
#define JL_GC_STATE_SAFE 2
216216
// gc_state = 2 means the thread is running unmanaged code that can be
217217
// execute at the same time with the GC.
218+
#define JL_GC_PARALLEL_COLLECTOR_THREAD 3
219+
// gc_state = 3 means the thread is a parallel collector thread (i.e. never runs Julia code)
220+
#define JL_GC_CONCURRENT_COLLECTOR_THREAD 4
221+
// gc_state = 4 means the thread is a concurrent collector thread (background sweeper thread that never runs Julia code)
218222
_Atomic(int8_t) gc_state; // read from foreign threads
219223
// execution of certain certain impure
220224
// statements is prohibited from certain
@@ -340,6 +344,8 @@ void jl_sigint_safepoint(jl_ptls_t tls);
340344
STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state,
341345
int8_t old_state)
342346
{
347+
assert(old_state != JL_GC_PARALLEL_COLLECTOR_THREAD);
348+
assert(old_state != JL_GC_CONCURRENT_COLLECTOR_THREAD);
343349
jl_atomic_store_release(&ptls->gc_state, state);
344350
// A safe point is required if we transition from GC-safe region to
345351
// non GC-safe region.

src/partr.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void jl_parallel_gc_threadfun(void *arg)
131131
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
132132
JL_GC_PROMISE_ROOTED(ct);
133133
// wait for all threads
134-
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
134+
jl_gc_state_set(ptls, JL_GC_PARALLEL_COLLECTOR_THREAD, 0);
135135
uv_barrier_wait(targ->barrier);
136136

137137
// free the thread argument here
@@ -143,10 +143,10 @@ void jl_parallel_gc_threadfun(void *arg)
143143
uv_cond_wait(&gc_threads_cond, &gc_threads_lock);
144144
}
145145
uv_mutex_unlock(&gc_threads_lock);
146-
if (may_mark()) {
147-
gc_mark_loop_parallel(ptls, 0);
148-
}
149-
if (may_sweep(ptls)) { // not an else!
146+
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
147+
gc_mark_loop_parallel(ptls, 0);
148+
if (may_sweep(ptls)) {
149+
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
150150
gc_sweep_pool_parallel(ptls);
151151
jl_atomic_fetch_add(&ptls->gc_sweeps_requested, -1);
152152
}
@@ -166,13 +166,14 @@ void jl_concurrent_gc_threadfun(void *arg)
166166
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
167167
JL_GC_PROMISE_ROOTED(ct);
168168
// wait for all threads
169-
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
169+
jl_gc_state_set(ptls, JL_GC_CONCURRENT_COLLECTOR_THREAD, 0);
170170
uv_barrier_wait(targ->barrier);
171171

172172
// free the thread argument here
173173
free(targ);
174174

175175
while (1) {
176+
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_CONCURRENT_COLLECTOR_THREAD);
176177
uv_sem_wait(&gc_sweep_assists_needed);
177178
gc_free_pages();
178179
}

0 commit comments

Comments
 (0)