Skip to content

Commit 011794d

Browse files
committed
Use errgroup to perform transactions concurrently in coordinator_test.go
1 parent 9cf1bfd commit 011794d

File tree

8 files changed

+241
-13
lines changed

8 files changed

+241
-13
lines changed

go/logic/coordinator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
522522

523523
// Schedule any jobs that were waiting for this job to complete or for the low watermark
524524
for waitingForSequenceNumber, channels := range c.waitingJobs {
525-
if waitingForSequenceNumber <= c.lowWaterMark || waitingForSequenceNumber == sequenceNumber {
525+
if waitingForSequenceNumber <= c.lowWaterMark {
526526
channelsToNotify = append(channelsToNotify, channels...)
527527
delete(c.waitingJobs, waitingForSequenceNumber)
528528
}

go/logic/coordinator_test.go

+27-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
gosql "database/sql"
66
"fmt"
7+
"math/rand/v2"
78
"os"
89
"testing"
910
"time"
@@ -18,13 +19,15 @@ import (
1819
"github.com/stretchr/testify/suite"
1920
"github.com/testcontainers/testcontainers-go"
2021
"github.com/testcontainers/testcontainers-go/wait"
22+
"golang.org/x/sync/errgroup"
2123
)
2224

2325
type CoordinatorTestSuite struct {
2426
suite.Suite
2527

26-
mysqlContainer testcontainers.Container
27-
db *gosql.DB
28+
mysqlContainer testcontainers.Container
29+
db *gosql.DB
30+
concurrentTransactions int
2831
}
2932

3033
func (suite *CoordinatorTestSuite) SetupSuite() {
@@ -51,6 +54,10 @@ func (suite *CoordinatorTestSuite) SetupSuite() {
5154
suite.Require().NoError(err)
5255

5356
suite.db = db
57+
suite.concurrentTransactions = 500
58+
59+
_, err = db.Exec(fmt.Sprintf("SET GLOBAL max_connections = %d", suite.concurrentTransactions*2))
60+
suite.Require().NoError(err)
5461
}
5562

5663
func (suite *CoordinatorTestSuite) SetupTest() {
@@ -133,20 +140,28 @@ func (suite *CoordinatorTestSuite) TestApplyDML() {
133140
err = applier.CreateChangelogTable()
134141
suite.Require().NoError(err)
135142

136-
// TODO: use errgroup
137-
for i := 0; i < 100; i++ {
138-
tx, err := suite.db.Begin()
139-
suite.Require().NoError(err)
143+
g, ctx := errgroup.WithContext(ctx)
144+
for range suite.concurrentTransactions {
145+
g.Go(func() error {
146+
tx, txErr := suite.db.Begin()
147+
if txErr != nil {
148+
return txErr
149+
}
140150

141-
for j := 0; j < 100; j++ {
142-
_, err = tx.Exec("INSERT INTO test.gh_ost_test (name) VALUES ('test')")
143-
suite.Require().NoError(err)
144-
}
151+
for range 100 {
152+
_, txErr = tx.Exec(fmt.Sprintf("INSERT INTO test.gh_ost_test (name) VALUES ('test-%d')", rand.Int()))
153+
if txErr != nil {
154+
return txErr
155+
}
156+
}
145157

146-
err = tx.Commit()
147-
suite.Require().NoError(err)
158+
return tx.Commit()
159+
})
148160
}
149161

162+
err = g.Wait()
163+
suite.Require().NoError(err)
164+
150165
_, err = suite.db.Exec("UPDATE test.gh_ost_test SET name = 'foobar' WHERE id = 1")
151166
suite.Require().NoError(err)
152167

vendor/golang.org/x/sync/LICENSE

+27
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/PATENTS

+22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/errgroup.go

+136
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/go120.go

+13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/pre_go120.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

+1
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf
299299
golang.org/x/net/context
300300
# golang.org/x/sync v0.11.0
301301
## explicit; go 1.18
302+
golang.org/x/sync/errgroup
302303
# golang.org/x/sys v0.30.0
303304
## explicit; go 1.18
304305
golang.org/x/sys/cpu

0 commit comments

Comments
 (0)