Skip to content

Commit cca20fb

Browse files
committed
fix
1 parent c2b9727 commit cca20fb

File tree

5 files changed

+130
-27
lines changed

5 files changed

+130
-27
lines changed

routers/api/actions/runner/utils.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {
7777
}
7878

7979
func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*runnerv1.TaskNeed, error) {
80-
taskNeeds, err := actions.FindTaskNeeds(ctx, task)
80+
if err := task.LoadAttributes(ctx); err != nil {
81+
return nil, fmt.Errorf("task LoadAttributes: %w", err)
82+
}
83+
taskNeeds, err := actions.FindTaskNeeds(ctx, task.Job)
8184
if err != nil {
8285
return nil, err
8386
}

services/actions/concurrency.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,31 @@ func evaluateJobConcurrency(run *actions_model.ActionRun, actionRunJob *actions_
2121

2222
gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))
2323

24+
singleWorkflows, err := jobparser.Parse([]byte(actionRunJob.WorkflowPayload))
25+
if err != nil {
26+
return "", false, fmt.Errorf("parse single workflow: %w", err)
27+
} else if len(singleWorkflows) != 1 {
28+
return "", false, fmt.Errorf("not single workflow")
29+
}
30+
31+
// FIXME: cannot get job needs
32+
singleWorkflow := singleWorkflows[0]
33+
singleWorkflow.Job()
34+
actJobMap := map[string]*act_model.Job{}
35+
if err := singleWorkflow.RawJobs.Decode(&actJobMap); err != nil {
36+
return "", false, fmt.Errorf("decode act job: %w", err)
37+
}
38+
2439
actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
2540
if err != nil {
2641
return "", false, fmt.Errorf("read workflow: %w", err)
2742
}
2843
actJob := actWorkflow.GetJob(actionRunJob.JobID)
2944

30-
if len(jobResults) == 0 {
31-
jobResults = map[string]*jobparser.JobResult{actionRunJob.JobID: {}}
45+
if jobResults == nil {
46+
jobResults = map[string]*jobparser.JobResult{}
3247
}
48+
jobResults[actionRunJob.JobID] = &jobparser.JobResult{}
3349

3450
concurrencyGroup, concurrencyCancel := jobparser.EvaluateJobConcurrency(rawConcurrency, actionRunJob.JobID, actJob, gitCtx, vars, jobResults)
3551

services/actions/job_emitter.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
235235
// check concurrency
236236
blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
237237
if err != nil {
238-
log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.")
238+
log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err)
239239
continue
240240
}
241241

@@ -271,17 +271,13 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
271271
if len(actionRunJob.RawConcurrencyGroup) == 0 {
272272
return false, nil
273273
}
274-
if err := actionRunJob.LoadRun(ctx); err != nil {
274+
if err := actionRunJob.LoadAttributes(ctx); err != nil {
275275
return false, err
276276
}
277277

278278
if !actionRunJob.IsConcurrencyEvaluated {
279279
// empty concurrency group means the raw concurrency has not been evaluated
280-
task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
281-
if err != nil {
282-
return false, fmt.Errorf("get task by id: %w", err)
283-
}
284-
taskNeeds, err := FindTaskNeeds(ctx, task)
280+
taskNeeds, err := FindTaskNeeds(ctx, actionRunJob)
285281
if err != nil {
286282
return false, fmt.Errorf("find task needs: %w", err)
287283
}
@@ -298,13 +294,9 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
298294
if err != nil {
299295
return false, fmt.Errorf("evaluate job concurrency: %w", err)
300296
}
297+
actionRunJob.IsConcurrencyEvaluated = true
301298

302-
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
303-
ID: actionRunJob.ID,
304-
IsConcurrencyEvaluated: true,
305-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
306-
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
307-
}, nil); err != nil {
299+
if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil {
308300
return false, fmt.Errorf("update run job: %w", err)
309301
}
310302
}

services/actions/utils.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,13 @@ type TaskNeed struct {
103103
Outputs map[string]string
104104
}
105105

106-
func FindTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*TaskNeed, error) {
107-
if err := task.LoadAttributes(ctx); err != nil {
108-
return nil, fmt.Errorf("LoadAttributes: %w", err)
109-
}
110-
if len(task.Job.Needs) == 0 {
106+
func FindTaskNeeds(ctx context.Context, job *actions_model.ActionRunJob) (map[string]*TaskNeed, error) {
107+
if len(job.Needs) == 0 {
111108
return nil, nil
112109
}
113-
needs := container.SetOf(task.Job.Needs...)
110+
needs := container.SetOf(job.Needs...)
114111

115-
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
112+
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: job.RunID})
116113
if err != nil {
117114
return nil, fmt.Errorf("FindRunJobs: %w", err)
118115
}

tests/integration/actions_concurrency_test.go

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"github.com/stretchr/testify/assert"
2222
)
2323

24-
func TestWorkflowConcurrency_NoCancellation(t *testing.T) {
24+
func TestWorkflowConcurrency(t *testing.T) {
2525
onGiteaRun(t, func(t *testing.T, u *url.URL) {
2626
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
2727
session := loginUser(t, user2.Name)
@@ -34,7 +34,7 @@ func TestWorkflowConcurrency_NoCancellation(t *testing.T) {
3434

3535
// add a variable for test
3636
req := NewRequestWithJSON(t, "POST",
37-
fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/qwe", user2.Name, repo.Name), &api.CreateVariableOption{
37+
fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{
3838
Value: "abc123",
3939
}).
4040
AddTokenAuth(token)
@@ -61,7 +61,7 @@ on:
6161
paths:
6262
- '.gitea/workflows/concurrent-workflow-2.yml'
6363
concurrency:
64-
group: workflow-${{ github.ref_name }}-${{ vars.qwe }}
64+
group: workflow-${{ github.ref_name }}-${{ vars.myvar }}
6565
jobs:
6666
wf2-job:
6767
runs-on: ubuntu-latest
@@ -126,7 +126,7 @@ jobs:
126126
})
127127
}
128128

129-
func TestWorkflowConcurrency_WithCancellation(t *testing.T) {
129+
func TestWorkflowConcurrency_WithPullRequest(t *testing.T) {
130130
onGiteaRun(t, func(t *testing.T, u *url.URL) {
131131
// user2 is the owner of the base repo
132132
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
@@ -276,6 +276,101 @@ jobs:
276276
})
277277
}
278278

279+
func TestJobConcurrency(t *testing.T) {
280+
onGiteaRun(t, func(t *testing.T, u *url.URL) {
281+
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
282+
session := loginUser(t, user2.Name)
283+
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
284+
285+
apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false)
286+
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
287+
runner1 := newMockRunner()
288+
runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"})
289+
runner2 := newMockRunner()
290+
runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"})
291+
292+
// add a variable for test
293+
req := NewRequestWithJSON(t, "POST",
294+
fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/version_var", user2.Name, repo.Name), &api.CreateVariableOption{
295+
Value: "v1.23.0",
296+
}).
297+
AddTokenAuth(token)
298+
MakeRequest(t, req, http.StatusNoContent)
299+
300+
wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml"
301+
wf1FileContent := `name: concurrent-workflow-1
302+
on:
303+
push:
304+
paths:
305+
- '.gitea/workflows/concurrent-workflow-1.yml'
306+
jobs:
307+
wf1-job1:
308+
runs-on: runner1
309+
concurrency:
310+
group: job-main-${{ vars.version_var }}
311+
steps:
312+
- run: echo 'wf1-job1'
313+
`
314+
wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml"
315+
wf2FileContent := `name: concurrent-workflow-2
316+
on:
317+
push:
318+
paths:
319+
- '.gitea/workflows/concurrent-workflow-2.yml'
320+
jobs:
321+
wf2-job1:
322+
runs-on: runner2
323+
outputs:
324+
version: ${{ steps.version_step.outputs.app_version }}
325+
steps:
326+
- id: version_step
327+
run: echo "app_version=v1.23.0" >> "$GITHUB_OUTPUT"
328+
wf2-job2:
329+
runs-on: runner1
330+
needs: [wf2-job1]
331+
concurrency:
332+
group: job-main-${{ needs.wf2-job1.outputs.version }}
333+
steps:
334+
- run: echo 'wf2-job2'
335+
`
336+
337+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent)
338+
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
339+
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent)
340+
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
341+
// opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent)
342+
// createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
343+
344+
// fetch wf1-job1
345+
wf1Job1Task := runner1.fetchTask(t)
346+
_, wf1Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf1Job1Task.Id)
347+
assert.Equal(t, "job-main-v1.23.0", wf1Job1ActionJob.ConcurrencyGroup)
348+
assert.True(t, wf1Job1ActionJob.Status.IsRunning())
349+
// fetch and exec wf2-job1
350+
wf2Job1Task := runner2.fetchTask(t)
351+
runner2.execTask(t, wf2Job1Task, &mockTaskOutcome{
352+
result: runnerv1.Result_RESULT_SUCCESS,
353+
outputs: map[string]string{
354+
"version": "v1.23.0",
355+
},
356+
})
357+
// cannot fetch wf2-job2 because wf1-job1 is running
358+
runner1.fetchNoTask(t)
359+
// exec wf1-job1
360+
runner1.execTask(t, wf1Job1Task, &mockTaskOutcome{
361+
result: runnerv1.Result_RESULT_SUCCESS,
362+
})
363+
// fetch wf2-job2
364+
wf2Job2Task := runner2.fetchTask(t)
365+
_, wf2Job2ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id)
366+
assert.Equal(t, "job-main-v1.23.0", wf2Job2ActionJob.ConcurrencyGroup)
367+
assert.True(t, wf1Job1ActionJob.Status.IsRunning())
368+
369+
httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository)
370+
doAPIDeleteRepository(httpContext)(t)
371+
})
372+
}
373+
279374
func getTaskAndJobAndRunByTaskID(t *testing.T, taskID int64) (*actions_model.ActionTask, *actions_model.ActionRunJob, *actions_model.ActionRun) {
280375
actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskID})
281376
actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID})

0 commit comments

Comments
 (0)