Skip to content

Commit f1ffb55

Browse files
Only check latencies once every 10 seconds with routeByLatency (#2795)
* Only check latencies once every 10 seconds with `routeByLatency` `routeByLatency` currently checks latencies any time a server returns a MOVED or READONLY reply. When a shard is down, the ClusterClient chooses to issue the request to a random server, which returns a MOVED reply. This causes a state refresh and a latency update on all servers. This can lead to significant ping load to clusters with a large number of clients. This introduces logic to ping only once every 10 seconds, only performing a latency update on a node during the `GC` function if the latency was set later than 10 seconds ago. Fixes #2782 * use UnixNano instead of Unix for better precision --------- Co-authored-by: ofekshenawa <[email protected]>
1 parent 080e051 commit f1ffb55

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

osscluster.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
"github.com/redis/go-redis/v9/internal/rand"
2222
)
2323

24+
const (
25+
minLatencyMeasurementInterval = 10 * time.Second
26+
)
27+
2428
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
2529

2630
// ClusterOptions are used to configure a cluster client and should be
@@ -316,6 +320,10 @@ type clusterNode struct {
316320
latency uint32 // atomic
317321
generation uint32 // atomic
318322
failing uint32 // atomic
323+
324+
// last time the latency measurement was performed for the node, stored in nanoseconds
325+
// from epoch
326+
lastLatencyMeasurement int64 // atomic
319327
}
320328

321329
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
368376
latency = float64(dur) / float64(successes)
369377
}
370378
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
379+
n.SetLastLatencyMeasurement(time.Now())
371380
}
372381

373382
func (n *clusterNode) Latency() time.Duration {
@@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
397406
return atomic.LoadUint32(&n.generation)
398407
}
399408

409+
func (n *clusterNode) LastLatencyMeasurement() int64 {
410+
return atomic.LoadInt64(&n.lastLatencyMeasurement)
411+
}
412+
400413
func (n *clusterNode) SetGeneration(gen uint32) {
401414
for {
402415
v := atomic.LoadUint32(&n.generation)
@@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
406419
}
407420
}
408421

422+
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
423+
for {
424+
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
425+
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
426+
break
427+
}
428+
}
429+
}
430+
409431
//------------------------------------------------------------------------------
410432

411433
type clusterNodes struct {
@@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
493515
c.mu.Lock()
494516

495517
c.activeAddrs = c.activeAddrs[:0]
518+
now := time.Now()
496519
for addr, node := range c.nodes {
497520
if node.Generation() >= generation {
498521
c.activeAddrs = append(c.activeAddrs, addr)
499-
if c.opt.RouteByLatency {
522+
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
500523
go node.updateLatency()
501524
}
502525
continue

0 commit comments

Comments
 (0)