-
-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Sequencial issue index numbering with pessimistic locking mechanism #9931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
333aec5
Passes PSQL, SQLite and MSSQL
guillep2k 70a2a8d
Move to upsert strategy; all tests work
guillep2k 677e49f
Merge branch 'master' into bylock-indexes
guillep2k d834fb1
Use a LockedResource to numerate issues and prs
guillep2k 58ac901
Fix tests and reserved keyword
guillep2k aa3797c
Fix unit tests
guillep2k 95db48a
Fix export comments
guillep2k d8ad174
A little refactoring and better function naming
guillep2k 6bec3d5
Merge branch 'master' into bylock-indexes
guillep2k e747a2c
Support LockType == "" and LockKey == 0
guillep2k afeb6f0
Merge branch 'master' into bylock-indexes
guillep2k c503f0d
Prepare for merge
guillep2k 9b7ec1d
Merge branch 'master' into bylock-indexes
guillep2k 1792664
Go simple
guillep2k ce6c24f
Improve test legibility
guillep2k 15ffbb4
Fix typo
guillep2k ea9c875
Remove dead code
guillep2k 9cb79c9
Merge branch 'master' into bylock-indexes
guillep2k d185a4f
Prepare for merge
guillep2k f46eaf5
Merge branch 'master' into bylock-indexes
guillep2k 621c9d6
Prepare to merge
guillep2k 17fa5e1
Merge branch 'master' into bylock-indexes
guillep2k b30094b
Merge branch 'master' into bylock-indexes
guillep2k 299d313
Merge branch 'master' into bylock-indexes
guillep2k cea7c4f
Merge branch 'master' into bylock-indexes
guillep2k 2311de3
Merge branch 'master' into bylock-indexes
guillep2k 7e280a4
Merge branch 'master' into bylock-indexes
guillep2k 15e407b
Code review suggestions by @lunny
guillep2k dd85873
Ignore SQLite3 integration when _txlock=immediate
guillep2k File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package integrations | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"code.gitea.io/gitea/models" | ||
"code.gitea.io/gitea/modules/log" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
const ( | ||
// The tests will fail if the waiter function takes less than | ||
// blockerDelay minus tolerance to complete. | ||
// Note: these values might require tuning in order to avoid | ||
// false negatives. | ||
waiterDelay = 100 * time.Millisecond | ||
blockerDelay = 200 * time.Millisecond | ||
tolerance = 50 * time.Millisecond // Should be <= (blockerDelay-waiterDelay)/2 | ||
) | ||
|
||
type waitResult struct { | ||
Waited time.Duration | ||
Err error | ||
} | ||
|
||
func TestLockedResource(t *testing.T) { | ||
defer prepareTestEnv(t)() | ||
|
||
// We need to check whether two goroutines block each other | ||
// Sadly, there's no way to ensure the second goroutine is | ||
// waiting other than using a time delay. The longer the delay, | ||
// the more certain we are the second goroutine is waiting. | ||
|
||
// This check **must** fail as we're not blocking anything | ||
assert.Error(t, blockTest("no block", func(ctx models.DBContext) error { | ||
return nil | ||
})) | ||
|
||
models.AssertNotExistsBean(t, &models.LockedResource{LockType: "test-1", LockKey: 1}) | ||
|
||
// Test with creation (i.e. new lock type) | ||
assert.NoError(t, blockTest("block-new", func(ctx models.DBContext) error { | ||
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1) | ||
return err | ||
})) | ||
|
||
// Test without creation (i.e. lock type already exists) | ||
assert.NoError(t, blockTest("block-existing", func(ctx models.DBContext) error { | ||
_, err := models.GetLockedResourceCtx(ctx, "block-test-1", 1) | ||
return err | ||
})) | ||
|
||
// Test with temporary record | ||
assert.NoError(t, blockTest("block-temp", func(ctx models.DBContext) error { | ||
return models.TempLockResourceCtx(ctx, "temp-1", 1) | ||
})) | ||
} | ||
|
||
func blockTest(name string, f func(ctx models.DBContext) error) error { | ||
cb := make(chan waitResult) | ||
cw := make(chan waitResult) | ||
ref := time.Now() | ||
|
||
go func() { | ||
cb <- blockTestFunc(name, true, ref, f) | ||
}() | ||
go func() { | ||
cw <- blockTestFunc(name, false, ref, f) | ||
}() | ||
|
||
resb := <-cb | ||
resw := <-cw | ||
if resb.Err != nil { | ||
return resb.Err | ||
} | ||
if resw.Err != nil { | ||
return resw.Err | ||
} | ||
|
||
if resw.Waited < blockerDelay-tolerance { | ||
return fmt.Errorf("Waiter not blocked on %s; wait: %d ms, expected > %d ms", | ||
name, resw.Waited.Milliseconds(), (blockerDelay - tolerance).Milliseconds()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func blockTestFunc(name string, blocker bool, ref time.Time, f func(ctx models.DBContext) error) (wr waitResult) { | ||
if blocker { | ||
name = fmt.Sprintf("blocker [%s]", name) | ||
} else { | ||
name = fmt.Sprintf("waiter [%s]", name) | ||
} | ||
err := models.WithTx(func(ctx models.DBContext) error { | ||
log.Trace("Entering %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
if !blocker { | ||
log.Trace("Waiting on %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
time.Sleep(waiterDelay) | ||
log.Trace("Wait finished on %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
} | ||
if err := f(ctx); err != nil { | ||
return err | ||
} | ||
if blocker { | ||
log.Trace("Waiting on %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
time.Sleep(blockerDelay) | ||
log.Trace("Wait finished on %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
} else { | ||
wr.Waited = time.Now().Sub(ref) | ||
} | ||
log.Trace("Finishing %s @%d", name, time.Now().Sub(ref).Milliseconds()) | ||
return nil | ||
}) | ||
if err != nil { | ||
wr.Err = fmt.Errorf("error in %s: %v", name, err) | ||
} | ||
return | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
// Copyright 2020 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package models | ||
|
||
import ( | ||
"fmt" | ||
|
||
"code.gitea.io/gitea/modules/setting" | ||
) | ||
|
||
// LockedResource represents the locking key for a pessimistic | ||
// lock that can hold a counter | ||
type LockedResource struct { | ||
LockType string `xorm:"pk VARCHAR(30)"` | ||
LockKey int64 `xorm:"pk"` | ||
Counter int64 `xorm:"NOT NULL DEFAULT 0"` | ||
} | ||
|
||
// GetLockedResource gets or creates a pessimistic lock on the given type and key | ||
func GetLockedResource(e Engine, lockType string, lockKey int64) (*LockedResource, error) { | ||
locked := &LockedResource{LockType: lockType, LockKey: lockKey} | ||
|
||
if err := upsertLockedResource(e, locked); err != nil { | ||
return nil, fmt.Errorf("upsertLockedResource: %v", err) | ||
} | ||
|
||
// Read back the record we've created or locked to get the current Counter value | ||
if has, err := e.Table(locked).Get(locked); err != nil { | ||
return nil, fmt.Errorf("get locked resource %s:%d: %v", lockType, lockKey, err) | ||
} else if !has { | ||
return nil, fmt.Errorf("unexpected upsert fail %s:%d", lockType, lockKey) | ||
} | ||
|
||
return locked, nil | ||
} | ||
|
||
// UpdateLockedResource updates the value of the counter of a locked resource | ||
func UpdateLockedResource(e Engine, resource *LockedResource) error { | ||
guillep2k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_, err := e.Table(resource).Cols("counter").Update(resource) | ||
return err | ||
} | ||
|
||
// DeleteLockedResource deletes a locked resource | ||
func DeleteLockedResource(e Engine, resource *LockedResource) error { | ||
guillep2k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_, err := e.Delete(resource) | ||
return err | ||
} | ||
|
||
// TempLockResource locks the given key but does not leave a permanent record | ||
func TempLockResource(e Engine, lockType string, lockKey int64) error { | ||
guillep2k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
locked := &LockedResource{LockType: lockType, LockKey: lockKey} | ||
// Temporary locked resources must not exist in the table. | ||
// This allows us to use a simple INSERT to lock the key. | ||
_, err := e.Insert(locked) | ||
if err == nil { | ||
_, err = e.Delete(locked) | ||
} | ||
return err | ||
} | ||
|
||
// GetLockedResourceCtx gets or creates a pessimistic lock on the given type and key | ||
func GetLockedResourceCtx(ctx DBContext, lockType string, lockKey int64) (*LockedResource, error) { | ||
return GetLockedResource(ctx.e, lockType, lockKey) | ||
} | ||
|
||
// UpdateLockedResourceCtx updates the value of the counter of a locked resource | ||
func UpdateLockedResourceCtx(ctx DBContext, resource *LockedResource) error { | ||
return UpdateLockedResource(ctx.e, resource) | ||
} | ||
|
||
// DeleteLockedResourceCtx deletes a locked resource | ||
func DeleteLockedResourceCtx(ctx DBContext, resource *LockedResource) error { | ||
return DeleteLockedResource(ctx.e, resource) | ||
} | ||
|
||
// TempLockResourceCtx locks the given key but does not leave a permanent record | ||
func TempLockResourceCtx(ctx DBContext, lockType string, lockKey int64) error { | ||
return TempLockResource(ctx.e, lockType, lockKey) | ||
} | ||
|
||
// upsertLockedResource will create or lock the given key in the database. | ||
// the function will not return until it acquires the lock or receives an error. | ||
func upsertLockedResource(e Engine, resource *LockedResource) (err error) { | ||
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation | ||
// that ensures that the key is actually locked. | ||
guillep2k marked this conversation as resolved.
Show resolved
Hide resolved
|
||
switch { | ||
case setting.Database.UseSQLite3 || setting.Database.UsePostgreSQL: | ||
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+ | ||
"VALUES (?,?) ON CONFLICT(lock_type, lock_key) DO UPDATE SET lock_key = ?", | ||
resource.LockType, resource.LockKey, resource.LockKey) | ||
case setting.Database.UseMySQL: | ||
_, err = e.Exec("INSERT INTO locked_resource (lock_type, lock_key) "+ | ||
"VALUES (?,?) ON DUPLICATE KEY UPDATE lock_key = lock_key", | ||
resource.LockType, resource.LockKey) | ||
case setting.Database.UseMSSQL: | ||
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/ | ||
_, err = e.Exec("MERGE locked_resource WITH (HOLDLOCK) as target "+ | ||
"USING (SELECT ? AS lock_type, ? AS lock_key) AS src "+ | ||
"ON src.lock_type = target.lock_type AND src.lock_key = target.lock_key "+ | ||
"WHEN MATCHED THEN UPDATE SET target.lock_key = target.lock_key "+ | ||
"WHEN NOT MATCHED THEN INSERT (lock_type, lock_key) "+ | ||
"VALUES (src.lock_type, src.lock_key);", | ||
resource.LockType, resource.LockKey) | ||
default: | ||
return fmt.Errorf("database type not supported") | ||
} | ||
return | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.