Skip to content

Commit 6154804

Browse files
committed
Make the Mirror Queue a queue
Convert the Mirror syncing queue to a modules/queue instead of the old simple queue. Signed-off-by: Andrew Thornton <[email protected]>
1 parent 8edda8b commit 6154804

File tree

5 files changed

+126
-47
lines changed

5 files changed

+126
-47
lines changed

custom/conf/app.example.ini

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -769,10 +769,10 @@ PATH =
769769
;; Global limit of repositories per user, applied at creation time. -1 means no limit
770770
;MAX_CREATION_LIMIT = -1
771771
;;
772-
;; Mirror sync queue length, increase if mirror syncing starts hanging
772+
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
773773
;MIRROR_QUEUE_LENGTH = 1000
774774
;;
775-
;; Patch test queue length, increase if pull request patch testing starts hanging
775+
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
776776
;PULL_REQUEST_QUEUE_LENGTH = 1000
777777
;;
778778
;; Preferred Licenses to place at the top of the List

docs/content/doc/advanced/config-cheat-sheet.en-us.md

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
5454
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
5555
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
5656
`-1` means no limit.
57-
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
57+
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
5858
as large as possible. Use caution when editing this value.
5959
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
60-
testing starts hanging.
60+
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
6161
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
6262
the top of the list. Name must match file name in options/license or custom/options/license.
6363
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
@@ -382,6 +382,8 @@ relation to port exhaustion.
382382

383383
## Queue (`queue` and `queue.*`)
384384

385+
Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`.
386+
385387
- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
386388
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
387389
- `LENGTH`: **20**: Maximal queue size before channel queues block
@@ -400,6 +402,22 @@ relation to port exhaustion.
400402
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
401403
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.
402404

405+
Gitea creates the following non-unique queues:
406+
407+
- `code_indexer`
408+
- `issue_indexer`
409+
- `notification-service`
410+
- `task`
411+
- `mail`
412+
- `push_update`
413+
414+
And the following unique queues:
415+
416+
- `repo_stats_update`
417+
- `repo-archive`
418+
- `mirror`
419+
- `pr_patch_checker`
420+
403421
## Admin (`admin`)
404422

405423
- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
@@ -588,7 +606,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
588606
command or full path).
589607
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
590608
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
591-
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
609+
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`
592610

593611
## Cache (`cache`)
594612

modules/queue/unique_queue_channel.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"sync"
1111

12+
"code.gitea.io/gitea/modules/json"
1213
"code.gitea.io/gitea/modules/log"
1314
)
1415

@@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
2930
type ChannelUniqueQueue struct {
3031
*WorkerPool
3132
lock sync.Mutex
32-
table map[Data]bool
33+
table map[string]bool
3334
shutdownCtx context.Context
3435
shutdownCtxCancel context.CancelFunc
3536
terminateCtx context.Context
@@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
5455
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
5556

5657
queue := &ChannelUniqueQueue{
57-
table: map[Data]bool{},
58+
table: map[string]bool{},
5859
shutdownCtx: shutdownCtx,
5960
shutdownCtxCancel: shutdownCtxCancel,
6061
terminateCtx: terminateCtx,
@@ -65,9 +66,14 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
6566
}
6667
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
6768
for _, datum := range data {
68-
queue.lock.Lock()
69-
delete(queue.table, datum)
70-
queue.lock.Unlock()
69+
bs, err := json.Marshal(datum)
70+
if err != nil {
71+
log.Error("unable to marshal data: %v", datum)
72+
} else {
73+
queue.lock.Lock()
74+
delete(queue.table, string(bs))
75+
queue.lock.Unlock()
76+
}
7177
handle(datum)
7278
}
7379
}, config.WorkerPoolConfiguration)
@@ -94,23 +100,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
94100
if !assignableTo(data, q.exemplar) {
95101
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
96102
}
103+
104+
bs, err := json.Marshal(data)
105+
if err != nil {
106+
return err
107+
}
97108
q.lock.Lock()
98109
locked := true
99110
defer func() {
100111
if locked {
101112
q.lock.Unlock()
102113
}
103114
}()
104-
if _, ok := q.table[data]; ok {
115+
if _, ok := q.table[string(bs)]; ok {
105116
return ErrAlreadyInQueue
106117
}
107118
// FIXME: We probably need to implement some sort of limit here
108119
// If the downstream queue blocks this table will grow without limit
109-
q.table[data] = true
120+
q.table[string(bs)] = true
110121
if fn != nil {
111122
err := fn()
112123
if err != nil {
113-
delete(q.table, data)
124+
delete(q.table, string(bs))
114125
return err
115126
}
116127
}
@@ -122,9 +133,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
122133

123134
// Has checks if the data is in the queue
124135
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
136+
bs, err := json.Marshal(data)
137+
if err != nil {
138+
return false, err
139+
}
140+
125141
q.lock.Lock()
126142
defer q.lock.Unlock()
127-
_, has := q.table[data]
143+
_, has := q.table[string(bs)]
128144
return has, nil
129145
}
130146

modules/setting/queue.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,16 @@ func NewQueueService() {
158158
if _, ok := sectionMap["LENGTH"]; !ok {
159159
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
160160
}
161+
162+
// Handle the old mirror queue configuration
163+
// Please note this will be a unique queue
164+
section = Cfg.Section("queue.mirror")
165+
sectionMap = map[string]bool{}
166+
for _, key := range section.Keys() {
167+
sectionMap[key.Name()] = true
168+
}
169+
if _, ok := sectionMap["LENGTH"]; !ok {
170+
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength))
171+
}
172+
161173
}

services/mirror/mirror.go

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,43 @@ package mirror
77
import (
88
"context"
99
"fmt"
10-
"strconv"
11-
"strings"
1210

1311
"code.gitea.io/gitea/models"
1412
"code.gitea.io/gitea/modules/graceful"
1513
"code.gitea.io/gitea/modules/log"
14+
"code.gitea.io/gitea/modules/queue"
1615
"code.gitea.io/gitea/modules/setting"
17-
"code.gitea.io/gitea/modules/sync"
1816
)
1917

20-
// mirrorQueue holds an UniqueQueue object of the mirror
21-
var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
18+
var mirrorQueue queue.UniqueQueue
19+
20+
// RequestType type of mirror request
21+
type RequestType int
22+
23+
const (
24+
// PullRequestType for pull mirrors
25+
PullRequestType RequestType = iota
26+
// PushRequestType for push mirrors
27+
PushRequestType
28+
)
29+
30+
// Request for the mirror queue
31+
type Request struct {
32+
Type RequestType
33+
RepoID int64
34+
}
35+
36+
// doMirror causes this request to mirror itself
37+
func doMirror(ctx context.Context, req *Request) {
38+
switch req.Type {
39+
case PushRequestType:
40+
_ = SyncPushMirror(ctx, req.RepoID)
41+
case PullRequestType:
42+
_ = SyncPullMirror(ctx, req.RepoID)
43+
default:
44+
log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
45+
}
46+
}
2247

2348
// Update checks and updates mirror repositories.
2449
func Update(ctx context.Context) error {
@@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
2954
log.Trace("Doing: Update")
3055

3156
handler := func(idx int, bean interface{}) error {
32-
var item string
57+
var item Request
3358
if m, ok := bean.(*models.Mirror); ok {
3459
if m.Repo == nil {
3560
log.Error("Disconnected mirror found: %d", m.ID)
3661
return nil
3762
}
38-
item = fmt.Sprintf("pull %d", m.RepoID)
63+
item = Request{
64+
Type: PullRequestType,
65+
RepoID: m.RepoID,
66+
}
3967
} else if m, ok := bean.(*models.PushMirror); ok {
4068
if m.Repo == nil {
4169
log.Error("Disconnected push-mirror found: %d", m.ID)
4270
return nil
4371
}
44-
item = fmt.Sprintf("push %d", m.ID)
72+
item = Request{
73+
Type: PushRequestType,
74+
RepoID: m.RepoID,
75+
}
4576
} else {
4677
log.Error("Unknown bean: %v", bean)
4778
return nil
@@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
5182
case <-ctx.Done():
5283
return fmt.Errorf("Aborted")
5384
default:
54-
mirrorQueue.Add(item)
55-
return nil
85+
return mirrorQueue.Push(&item)
5686
}
5787
}
5888

@@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
6898
return nil
6999
}
70100

71-
// syncMirrors checks and syncs mirrors.
72-
// FIXME: graceful: this should be a persistable queue
73-
func syncMirrors(ctx context.Context) {
74-
// Start listening on new sync requests.
75-
for {
76-
select {
77-
case <-ctx.Done():
78-
mirrorQueue.Close()
79-
return
80-
case item := <-mirrorQueue.Queue():
81-
id, _ := strconv.ParseInt(item[5:], 10, 64)
82-
if strings.HasPrefix(item, "pull") {
83-
_ = SyncPullMirror(ctx, id)
84-
} else if strings.HasPrefix(item, "push") {
85-
_ = SyncPushMirror(ctx, id)
86-
} else {
87-
log.Error("Unknown item in queue: %v", item)
88-
}
89-
mirrorQueue.Remove(item)
90-
}
101+
func queueHandle(data ...queue.Data) {
102+
for _, datum := range data {
103+
req := datum.(*Request)
104+
doMirror(graceful.GetManager().ShutdownContext(), req)
91105
}
92106
}
93107

@@ -96,21 +110,40 @@ func InitSyncMirrors() {
96110
if !setting.Mirror.Enabled {
97111
return
98112
}
99-
go graceful.GetManager().RunWithShutdownContext(syncMirrors)
113+
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request))
114+
115+
go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
100116
}
101117

102118
// StartToMirror adds repoID to mirror queue
103119
func StartToMirror(repoID int64) {
104120
if !setting.Mirror.Enabled {
105121
return
106122
}
107-
go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
123+
go func() {
124+
err := mirrorQueue.Push(&Request{
125+
Type: PushRequestType,
126+
RepoID: repoID,
127+
})
128+
if err != nil {
129+
log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err)
130+
}
131+
}()
108132
}
109133

110134
// AddPushMirrorToQueue adds the push mirror to the queue
111135
func AddPushMirrorToQueue(mirrorID int64) {
112136
if !setting.Mirror.Enabled {
113137
return
114138
}
115-
go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
139+
go func() {
140+
141+
err := mirrorQueue.Push(&Request{
142+
Type: PullRequestType,
143+
RepoID: mirrorID,
144+
})
145+
if err != nil {
146+
log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err)
147+
}
148+
}()
116149
}

0 commit comments

Comments
 (0)