Skip to content

Commit 7a55a9d

Browse files
committed
fix
1 parent 23f77c3 commit 7a55a9d

File tree

2 files changed

+84
-57
lines changed

2 files changed

+84
-57
lines changed

x/mongo/driver/topology/pool.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,8 +902,9 @@ func (p *pool) checkInNoEvent(conn *connection) error {
902902
// not usable, which is not covered by the current pool events. We may need
903903
// to add pool event information in the future to communicate that.
904904
if conn.awaitingResponse != nil {
905-
go bgRead(p, conn, *conn.awaitingResponse)
905+
size := *conn.awaitingResponse
906906
conn.awaitingResponse = nil
907+
go bgRead(p, conn, size)
907908
return nil
908909
}
909910

x/mongo/driver/topology/pool_test.go

Lines changed: 82 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"io"
1313
"net"
1414
"os"
15+
"regexp"
1516
"sync"
1617
"testing"
1718
"time"
@@ -1128,9 +1129,13 @@ func TestPool(t *testing.T) {
11281129
t.Run("bgRead", func(t *testing.T) {
11291130
t.Parallel()
11301131

1131-
var gotErrs []error
1132+
var errCh chan error
11321133
BGReadCallback = func(addr string, start, read time.Time, errs []error, connClosed bool) {
1133-
gotErrs = errs
1134+
defer close(errCh)
1135+
1136+
for _, err := range errs {
1137+
errCh <- err
1138+
}
11341139
}
11351140

11361141
const sockPath = "./test.sock"
@@ -1140,6 +1145,8 @@ func TestPool(t *testing.T) {
11401145
setup := func(t *testing.T) {
11411146
t.Helper()
11421147

1148+
errCh = make(chan error)
1149+
11431150
var err error
11441151
socket, err = net.Listen("unix", sockPath)
11451152
noerr(t, err)
@@ -1148,30 +1155,26 @@ func TestPool(t *testing.T) {
11481155
t.Helper()
11491156

11501157
os.Remove(sockPath)
1151-
gotErrs = nil
11521158
}
11531159

11541160
t.Run("incomplete read of message header", func(t *testing.T) {
11551161
setup(t)
11561162
defer teardown(t)
11571163

11581164
wg := &sync.WaitGroup{}
1165+
wg.Add(1)
11591166
go func(t *testing.T) {
11601167
t.Helper()
11611168

1169+
defer wg.Done()
1170+
11621171
conn, err := socket.Accept()
11631172
noerr(t, err)
1164-
wg.Add(1)
1165-
go func(conn net.Conn) {
1166-
defer func() {
1167-
conn.Close()
1168-
wg.Done()
1169-
}()
1173+
defer conn.Close()
11701174

1171-
_, err = conn.Write([]byte{10, 0, 0})
1172-
noerr(t, err)
1173-
time.Sleep(1500 * time.Millisecond)
1174-
}(conn)
1175+
_, err = conn.Write([]byte{10, 0, 0})
1176+
noerr(t, err)
1177+
time.Sleep(1500 * time.Millisecond)
11751178
}(t)
11761179

11771180
p := newPool(
@@ -1190,36 +1193,34 @@ func TestPool(t *testing.T) {
11901193
ctx, cancel := csot.MakeTimeoutContext(context.Background(), time.Second)
11911194
defer cancel()
11921195
_, err = conn.readWireMessage(ctx)
1193-
assert.ErrorContains(t, err,
1194-
"incomplete read of message header: context deadline exceeded: read unix ->./test.sock: i/o timeout")
1195-
err = p.checkIn(conn)
1196-
noerr(t, err)
1196+
regex := regexp.MustCompile(
1197+
`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read unix .*->\.\/test.sock: i\/o timeout$`,
1198+
)
1199+
assert.True(t, regex.MatchString(err.Error()), "mismatched err: %v", err)
1200+
assert.Nil(t, conn.awaitingResponse, "conn.awaitingResponse should be nil")
11971201
wg.Wait()
11981202
p.close(context.Background())
1199-
assert.Len(t, gotErrs, 0)
1203+
close(errCh)
12001204
})
12011205
t.Run("timeout on reading the message header", func(t *testing.T) {
12021206
setup(t)
12031207
defer teardown(t)
12041208

12051209
wg := &sync.WaitGroup{}
1210+
wg.Add(1)
12061211
go func(t *testing.T) {
12071212
t.Helper()
12081213

1214+
defer wg.Done()
1215+
12091216
conn, err := socket.Accept()
12101217
noerr(t, err)
1211-
wg.Add(1)
1212-
go func(conn net.Conn) {
1213-
defer func() {
1214-
conn.Close()
1215-
wg.Done()
1216-
}()
1217-
1218-
time.Sleep(1500 * time.Millisecond)
1219-
_, err = conn.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0})
1220-
noerr(t, err)
1221-
time.Sleep(1500 * time.Millisecond)
1222-
}(conn)
1218+
defer conn.Close()
1219+
1220+
time.Sleep(1500 * time.Millisecond)
1221+
_, err = conn.Write([]byte{10, 0, 0, 0, 0, 0, 0, 0})
1222+
noerr(t, err)
1223+
time.Sleep(1500 * time.Millisecond)
12231224
}(t)
12241225

12251226
p := newPool(
@@ -1238,42 +1239,54 @@ func TestPool(t *testing.T) {
12381239
ctx, cancel := csot.MakeTimeoutContext(context.Background(), time.Second)
12391240
defer cancel()
12401241
_, err = conn.readWireMessage(ctx)
1241-
assert.ErrorContains(t, err,
1242-
"incomplete read of message header: context deadline exceeded: read unix ->./test.sock: i/o timeout")
1242+
regex := regexp.MustCompile(
1243+
`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read unix .*->\.\/test.sock: i\/o timeout$`,
1244+
)
1245+
assert.True(t, regex.MatchString(err.Error()), "mismatched err: %v", err)
12431246
err = p.checkIn(conn)
12441247
noerr(t, err)
12451248
wg.Wait()
12461249
p.close(context.Background())
1247-
assert.Len(t, gotErrs, 1)
1248-
for _, err = range gotErrs {
1249-
assert.ErrorContains(t, err,
1250-
"error reading message of 6: read unix ->./test.sock: i/o timeout")
1250+
errs := []*regexp.Regexp{
1251+
regexp.MustCompile(
1252+
`^error reading message of 6: read unix .*->\.\/test.sock: i\/o timeout$`,
1253+
),
1254+
}
1255+
for i := 0; true; i++ {
1256+
err, ok := <-errCh
1257+
if !ok {
1258+
if i != len(errs) {
1259+
assert.Fail(t, "expected more errors")
1260+
}
1261+
break
1262+
} else if i < len(errs) {
1263+
assert.True(t, errs[i].MatchString(err.Error()), "mismatched err: %v", err)
1264+
} else {
1265+
assert.Fail(t, "unexpected error", "got unexpected error: %v", err)
1266+
}
12511267
}
12521268
})
12531269
t.Run("timeout on reading the full message", func(t *testing.T) {
12541270
setup(t)
12551271
defer teardown(t)
12561272

12571273
wg := &sync.WaitGroup{}
1274+
wg.Add(1)
12581275
go func(t *testing.T) {
12591276
t.Helper()
12601277

1278+
defer wg.Done()
1279+
12611280
conn, err := socket.Accept()
12621281
noerr(t, err)
1263-
wg.Add(1)
1264-
go func(conn net.Conn) {
1265-
defer func() {
1266-
conn.Close()
1267-
wg.Done()
1268-
}()
1282+
defer conn.Close()
12691283

1270-
_, err = conn.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1})
1271-
noerr(t, err)
1272-
time.Sleep(1500 * time.Millisecond)
1273-
_, err = conn.Write([]byte{2, 3, 4})
1274-
noerr(t, err)
1275-
time.Sleep(1500 * time.Millisecond)
1276-
}(conn)
1284+
_, err = conn.Write([]byte{12, 0, 0, 0, 0, 0, 0, 0, 1})
1285+
noerr(t, err)
1286+
time.Sleep(1500 * time.Millisecond)
1287+
_, err = conn.Write([]byte{2, 3, 4})
1288+
noerr(t, err)
1289+
time.Sleep(1500 * time.Millisecond)
12771290
}(t)
12781291

12791292
p := newPool(
@@ -1291,19 +1304,32 @@ func TestPool(t *testing.T) {
12911304

12921305
conn, err := p.checkOut(context.Background())
12931306
noerr(t, err)
1294-
ctx, cancel := csot.MakeTimeoutContext(context.Background(), time.Second)
1307+
ctx, cancel := csot.MakeTimeoutContext(context.Background(), 1*time.Second)
12951308
defer cancel()
12961309
_, err = conn.readWireMessage(ctx)
1297-
assert.ErrorContains(t, err,
1298-
"incomplete read of full message: context deadline exceeded: read unix ->./test.sock: i/o timeout")
1310+
regex := regexp.MustCompile(
1311+
`^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read unix .*->\.\/test.sock: i\/o timeout$`,
1312+
)
1313+
assert.True(t, regex.MatchString(err.Error()), "mismatched err: %v", err)
12991314
err = p.checkIn(conn)
13001315
noerr(t, err)
13011316
wg.Wait()
13021317
p.close(context.Background())
1303-
assert.Len(t, gotErrs, 1)
1304-
for _, err = range gotErrs {
1305-
assert.ErrorContains(t, err,
1306-
"error reading message of 3: EOF")
1318+
errs := []string{
1319+
"error reading message of 3: EOF",
1320+
}
1321+
for i := 0; true; i++ {
1322+
err, ok := <-errCh
1323+
if !ok {
1324+
if i != len(errs) {
1325+
assert.Fail(t, "expected more errors")
1326+
}
1327+
break
1328+
} else if i < len(errs) {
1329+
assert.EqualError(t, err, errs[i], "mismatched err: %v", err)
1330+
} else {
1331+
assert.Fail(t, "unexpected error", "got unexpected error: %v", err)
1332+
}
13071333
}
13081334
})
13091335
})

0 commit comments

Comments
 (0)