Skip to content

Commit bb57fc5

Browse files
authored
Merge pull request #67 from arangodb/cluster-scaling-fix
Fixed behavior for scaling UI integration wrt startup of the cluster
2 parents da9e477 + 1e51a90 commit bb57fc5

File tree

4 files changed

+197
-170
lines changed

4 files changed

+197
-170
lines changed

pkg/deployment/cluster_informer.go

-104
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package deployment
24+
25+
import (
26+
"context"
27+
"sync"
28+
"time"
29+
30+
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
31+
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
32+
"github.com/rs/zerolog"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
)
35+
36+
// clusterScalingIntegration is a helper to communicate with the clusters
37+
// scaling UI.
38+
type clusterScalingIntegration struct {
39+
log zerolog.Logger
40+
depl *Deployment
41+
pendingUpdate struct {
42+
mutex sync.Mutex
43+
spec *api.DeploymentSpec
44+
}
45+
lastNumberOfServers struct {
46+
arangod.NumberOfServers
47+
mutex sync.Mutex
48+
}
49+
}
50+
51+
// newClusterScalingIntegration creates a new clusterScalingIntegration.
52+
func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {
53+
return &clusterScalingIntegration{
54+
log: depl.deps.Log,
55+
depl: depl,
56+
}
57+
}
58+
59+
// SendUpdateToCluster records the given spec to be sended to the cluster.
60+
func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) {
61+
ci.pendingUpdate.mutex.Lock()
62+
defer ci.pendingUpdate.mutex.Unlock()
63+
ci.pendingUpdate.spec = &spec
64+
}
65+
66+
// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
67+
func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) {
68+
for {
69+
delay := time.Second * 2
70+
71+
// Is deployment in running state
72+
if ci.depl.status.State == api.DeploymentStateRunning {
73+
// Update cluster with our state
74+
ctx := context.Background()
75+
safeToAskCluster, err := ci.updateClusterServerCount(ctx)
76+
if err != nil {
77+
ci.log.Debug().Err(err).Msg("Cluster update failed")
78+
} else if safeToAskCluster {
79+
// Inspect once
80+
if err := ci.inspectCluster(ctx); err != nil {
81+
ci.log.Debug().Err(err).Msg("Cluster inspection failed")
82+
}
83+
}
84+
}
85+
86+
select {
87+
case <-time.After(delay):
88+
// Continue
89+
case <-stopCh:
90+
// We're done
91+
return
92+
}
93+
}
94+
}
95+
96+
// Perform a single inspection of the cluster
97+
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context) error {
98+
log := ci.log
99+
c, err := ci.depl.clientCache.GetDatabase(ctx)
100+
if err != nil {
101+
return maskAny(err)
102+
}
103+
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
104+
if err != nil {
105+
log.Debug().Err(err).Msg("Failed to get number of servers")
106+
return maskAny(err)
107+
}
108+
if req.Coordinators == nil && req.DBServers == nil {
109+
// Nothing to check
110+
return nil
111+
}
112+
coordinatorsChanged := false
113+
dbserversChanged := false
114+
ci.lastNumberOfServers.mutex.Lock()
115+
defer ci.lastNumberOfServers.mutex.Unlock()
116+
desired := ci.lastNumberOfServers.NumberOfServers
117+
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
118+
// #Coordinator has changed
119+
coordinatorsChanged = true
120+
}
121+
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
122+
// #DBServers has changed
123+
dbserversChanged = true
124+
}
125+
if !coordinatorsChanged && !dbserversChanged {
126+
// Nothing has changed
127+
return nil
128+
}
129+
// Let's update the spec
130+
apiObject := ci.depl.apiObject
131+
current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(apiObject.Namespace).Get(apiObject.Name, metav1.GetOptions{})
132+
if err != nil {
133+
log.Debug().Err(err).Msg("Failed to get current deployment")
134+
return maskAny(err)
135+
}
136+
if coordinatorsChanged {
137+
current.Spec.Coordinators.Count = req.GetCoordinators()
138+
}
139+
if dbserversChanged {
140+
current.Spec.DBServers.Count = req.GetDBServers()
141+
}
142+
if err := ci.depl.updateCRSpec(current.Spec); err != nil {
143+
log.Warn().Err(err).Msg("Failed to update current deployment")
144+
return maskAny(err)
145+
}
146+
return nil
147+
}
148+
149+
// updateClusterServerCount updates the intended number of servers of the cluster.
150+
// Returns true when it is safe to ask the cluster for updates.
151+
func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context) (bool, error) {
152+
// Any update needed?
153+
ci.pendingUpdate.mutex.Lock()
154+
spec := ci.pendingUpdate.spec
155+
ci.pendingUpdate.mutex.Unlock()
156+
if spec == nil {
157+
// Nothing pending
158+
return true, nil
159+
}
160+
161+
log := ci.log
162+
c, err := ci.depl.clientCache.GetDatabase(ctx)
163+
if err != nil {
164+
return false, maskAny(err)
165+
}
166+
coordinatorCount := spec.Coordinators.Count
167+
dbserverCount := spec.DBServers.Count
168+
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
169+
log.Debug().Err(err).Msg("Failed to set number of servers")
170+
return false, maskAny(err)
171+
}
172+
173+
// Success, now update internal state
174+
safeToAskCluster := false
175+
ci.pendingUpdate.mutex.Lock()
176+
if spec == ci.pendingUpdate.spec {
177+
ci.pendingUpdate.spec = nil
178+
safeToAskCluster = true
179+
}
180+
ci.pendingUpdate.mutex.Unlock()
181+
182+
ci.lastNumberOfServers.mutex.Lock()
183+
defer ci.lastNumberOfServers.mutex.Unlock()
184+
185+
ci.lastNumberOfServers.Coordinators = &coordinatorCount
186+
ci.lastNumberOfServers.DBServers = &dbserverCount
187+
return safeToAskCluster, nil
188+
}

pkg/deployment/cluster_updater.go

-50
This file was deleted.

0 commit comments

Comments
 (0)