Skip to content

Commit a73ff44

Browse files
committed
support concurrency
1 parent cbb2e52 commit a73ff44

17 files changed

+1586
-188
lines changed

models/actions/run.go

+57-102
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"code.gitea.io/gitea/modules/util"
2222
webhook_module "code.gitea.io/gitea/modules/webhook"
2323

24-
"github.com/nektos/act/pkg/jobparser"
2524
"xorm.io/builder"
2625
)
2726

@@ -48,6 +47,8 @@ type ActionRun struct {
4847
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948
Status Status `xorm:"index"`
5049
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+
ConcurrencyGroup string `xorm:"index"`
51+
ConcurrencyCancel bool
5152
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253
Started timeutil.TimeStamp
5354
Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170
return run.ScheduleID > 0
170171
}
171172

172-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
173174
_, err := db.GetEngine(ctx).ID(repo.ID).
174175
SetExpr("num_action_runs",
175176
builder.Select("count(*)").From("action_run").
@@ -225,121 +226,57 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
225226
return cancelledJobs, err
226227
}
227228

228-
// Iterate over each job and attempt to cancel it.
229-
for _, job := range jobs {
230-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
231-
status := job.Status
232-
if status.IsDone() {
233-
continue
234-
}
235-
236-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
237-
if job.TaskID == 0 {
238-
job.Status = StatusCancelled
239-
job.Stopped = timeutil.TimeStampNow()
240-
241-
// Update the job's status and stopped time in the database.
242-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
243-
if err != nil {
244-
return cancelledJobs, err
245-
}
246-
247-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
248-
if n == 0 {
249-
return cancelledJobs, errors.New("job has changed, try again")
250-
}
251-
252-
cancelledJobs = append(cancelledJobs, job)
253-
// Continue with the next job.
254-
continue
255-
}
256-
257-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
258-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
259-
return cancelledJobs, err
260-
}
261-
cancelledJobs = append(cancelledJobs, job)
229+
cjs, err := CancelJobs(ctx, jobs)
230+
if err != nil {
231+
return cancelledJobs, err
262232
}
233+
cancelledJobs = append(cancelledJobs, cjs...)
263234
}
264235

265236
// Return nil to indicate successful cancellation of all running and waiting jobs.
266237
return cancelledJobs, nil
267238
}
268239

269-
// InsertRun inserts a run
270-
// The title will be cut off at 255 characters if it's longer than 255 characters.
271-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
272-
ctx, committer, err := db.TxContext(ctx)
273-
if err != nil {
274-
return err
275-
}
276-
defer committer.Close()
277-
278-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
279-
if err != nil {
280-
return err
281-
}
282-
run.Index = index
283-
run.Title = util.EllipsisDisplayString(run.Title, 255)
240+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
241+
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
242+
// Iterate over each job and attempt to cancel it.
243+
for _, job := range jobs {
244+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
245+
status := job.Status
246+
if status.IsDone() {
247+
continue
248+
}
284249

285-
if err := db.Insert(ctx, run); err != nil {
286-
return err
287-
}
250+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
251+
if job.TaskID == 0 {
252+
job.Status = StatusCancelled
253+
job.Stopped = timeutil.TimeStampNow()
288254

289-
if run.Repo == nil {
290-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
291-
if err != nil {
292-
return err
293-
}
294-
run.Repo = repo
295-
}
255+
// Update the job's status and stopped time in the database.
256+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
257+
if err != nil {
258+
return cancelledJobs, err
259+
}
296260

297-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
298-
return err
299-
}
261+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
262+
if n == 0 {
263+
return cancelledJobs, errors.New("job has changed, try again")
264+
}
300265

301-
runJobs := make([]*ActionRunJob, 0, len(jobs))
302-
var hasWaiting bool
303-
for _, v := range jobs {
304-
id, job := v.Job()
305-
needs := job.Needs()
306-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
307-
return err
308-
}
309-
payload, _ := v.Marshal()
310-
status := StatusWaiting
311-
if len(needs) > 0 || run.NeedApproval {
312-
status = StatusBlocked
313-
} else {
314-
hasWaiting = true
266+
cancelledJobs = append(cancelledJobs, job)
267+
// Continue with the next job.
268+
continue
315269
}
316-
job.Name = util.EllipsisDisplayString(job.Name, 255)
317-
runJobs = append(runJobs, &ActionRunJob{
318-
RunID: run.ID,
319-
RepoID: run.RepoID,
320-
OwnerID: run.OwnerID,
321-
CommitSHA: run.CommitSHA,
322-
IsForkPullRequest: run.IsForkPullRequest,
323-
Name: job.Name,
324-
WorkflowPayload: payload,
325-
JobID: id,
326-
Needs: needs,
327-
RunsOn: job.RunsOn(),
328-
Status: status,
329-
})
330-
}
331-
if err := db.Insert(ctx, runJobs); err != nil {
332-
return err
333-
}
334270

335-
// if there is a job in the waiting status, increase tasks version.
336-
if hasWaiting {
337-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
338-
return err
271+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
272+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
273+
return cancelledJobs, err
339274
}
275+
cancelledJobs = append(cancelledJobs, job)
340276
}
341277

342-
return committer.Commit()
278+
// Return nil to indicate successful cancellation of all running and waiting jobs.
279+
return cancelledJobs, nil
343280
}
344281

345282
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
@@ -431,7 +368,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
431368
}
432369
run.Repo = repo
433370
}
434-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
371+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
435372
return err
436373
}
437374
}
@@ -440,3 +377,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
440377
}
441378

442379
type ActionRunIndex db.ResourceIndex
380+
381+
func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
382+
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
383+
return false, nil
384+
}
385+
386+
concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
387+
RepoID: actionRun.RepoID,
388+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
389+
Status: []Status{StatusWaiting, StatusRunning},
390+
})
391+
if err != nil {
392+
return false, fmt.Errorf("find running and waiting runs: %w", err)
393+
}
394+
previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID })
395+
396+
return len(previousRuns) > 0, nil
397+
}

models/actions/run_job.go

+121-4
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,17 @@ type ActionRunJob struct {
3535
RunsOn []string `xorm:"JSON TEXT"`
3636
TaskID int64 // the latest task of the job
3737
Status Status `xorm:"index"`
38-
Started timeutil.TimeStamp
39-
Stopped timeutil.TimeStamp
40-
Created timeutil.TimeStamp `xorm:"created"`
41-
Updated timeutil.TimeStamp `xorm:"updated index"`
38+
39+
RawConcurrencyGroup string // raw concurrency.group
40+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
41+
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
42+
ConcurrencyGroup string `xorm:"index"` // evaluated concurrency.group
43+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
44+
45+
Started timeutil.TimeStamp
46+
Stopped timeutil.TimeStamp
47+
Created timeutil.TimeStamp `xorm:"created"`
48+
Updated timeutil.TimeStamp `xorm:"updated index"`
4249
}
4350

4451
func init() {
@@ -197,3 +204,113 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
197204
return StatusUnknown // it shouldn't happen
198205
}
199206
}
207+
208+
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
209+
if job.RawConcurrencyGroup == "" {
210+
return false, nil
211+
}
212+
if !job.IsConcurrencyEvaluated {
213+
return false, ErrUnevaluatedConcurrency{
214+
Group: job.RawConcurrencyGroup,
215+
CancelInProgress: job.RawConcurrencyCancel,
216+
}
217+
}
218+
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
219+
return false, nil
220+
}
221+
222+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
223+
RepoID: job.RepoID,
224+
ConcurrencyGroup: job.ConcurrencyGroup,
225+
Statuses: []Status{StatusRunning, StatusWaiting},
226+
})
227+
if err != nil {
228+
return false, fmt.Errorf("count running and waiting jobs: %w", err)
229+
}
230+
if concurrentJobsNum > 0 {
231+
return true, nil
232+
}
233+
234+
if err := job.LoadRun(ctx); err != nil {
235+
return false, fmt.Errorf("load run: %w", err)
236+
}
237+
238+
return ShouldBlockRunByConcurrency(ctx, job.Run)
239+
}
240+
241+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242+
var cancelledJobs []*ActionRunJob
243+
244+
if job.RawConcurrencyGroup != "" {
245+
if !job.IsConcurrencyEvaluated {
246+
return cancelledJobs, ErrUnevaluatedConcurrency{
247+
Group: job.RawConcurrencyGroup,
248+
CancelInProgress: job.RawConcurrencyCancel,
249+
}
250+
}
251+
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
252+
// cancel previous jobs in the same concurrency group
253+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
254+
RepoID: job.RepoID,
255+
ConcurrencyGroup: job.ConcurrencyGroup,
256+
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
257+
})
258+
if err != nil {
259+
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
260+
}
261+
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
262+
cjs, err := CancelJobs(ctx, previousJobs)
263+
if err != nil {
264+
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
265+
}
266+
cancelledJobs = append(cancelledJobs, cjs...)
267+
}
268+
}
269+
270+
if err := job.LoadRun(ctx); err != nil {
271+
return cancelledJobs, fmt.Errorf("load run: %w", err)
272+
}
273+
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
274+
// cancel previous runs in the same concurrency group
275+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
276+
RepoID: job.RepoID,
277+
ConcurrencyGroup: job.Run.ConcurrencyGroup,
278+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
279+
})
280+
if err != nil {
281+
return cancelledJobs, fmt.Errorf("find runs: %w", err)
282+
}
283+
for _, run := range runs {
284+
if run.ID == job.Run.ID {
285+
continue
286+
}
287+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
288+
RunID: run.ID,
289+
})
290+
if err != nil {
291+
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
292+
}
293+
cjs, err := CancelJobs(ctx, jobs)
294+
if err != nil {
295+
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
296+
}
297+
cancelledJobs = append(cancelledJobs, cjs...)
298+
}
299+
}
300+
301+
return cancelledJobs, nil
302+
}
303+
304+
type ErrUnevaluatedConcurrency struct {
305+
Group string
306+
CancelInProgress string
307+
}
308+
309+
func IsErrUnevaluatedConcurrency(err error) bool {
310+
_, ok := err.(ErrUnevaluatedConcurrency)
311+
return ok
312+
}
313+
314+
func (err ErrUnevaluatedConcurrency) Error() string {
315+
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress)
316+
}

models/actions/run_job_list.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
6969

7070
type FindRunJobOptions struct {
7171
db.ListOptions
72-
RunID int64
73-
RepoID int64
74-
OwnerID int64
75-
CommitSHA string
76-
Statuses []Status
77-
UpdatedBefore timeutil.TimeStamp
72+
RunID int64
73+
RepoID int64
74+
OwnerID int64
75+
CommitSHA string
76+
Statuses []Status
77+
UpdatedBefore timeutil.TimeStamp
78+
ConcurrencyGroup string
7879
}
7980

8081
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -97,5 +98,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
9798
if opts.UpdatedBefore > 0 {
9899
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
99100
}
101+
if opts.ConcurrencyGroup != "" {
102+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
103+
}
100104
return cond
101105
}

0 commit comments

Comments
 (0)