Skip to content

Commit 0909a77

Browse files
committed
Merged in master
2 parents 3ed5c77 + bb57fc5 commit 0909a77

File tree

4 files changed

+199
-172
lines changed

4 files changed

+199
-172
lines changed

pkg/deployment/cluster_informer.go

-106
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package deployment
24+
25+
import (
26+
"context"
27+
"sync"
28+
"time"
29+
30+
"github.com/rs/zerolog"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
33+
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
34+
"github.com/arangodb/kube-arangodb/pkg/util"
35+
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
36+
)
37+
38+
// clusterScalingIntegration is a helper to communicate with the clusters
39+
// scaling UI.
40+
type clusterScalingIntegration struct {
41+
log zerolog.Logger
42+
depl *Deployment
43+
pendingUpdate struct {
44+
mutex sync.Mutex
45+
spec *api.DeploymentSpec
46+
}
47+
lastNumberOfServers struct {
48+
arangod.NumberOfServers
49+
mutex sync.Mutex
50+
}
51+
}
52+
53+
// newClusterScalingIntegration creates a new clusterScalingIntegration.
54+
func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {
55+
return &clusterScalingIntegration{
56+
log: depl.deps.Log,
57+
depl: depl,
58+
}
59+
}
60+
61+
// SendUpdateToCluster records the given spec to be sended to the cluster.
62+
func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) {
63+
ci.pendingUpdate.mutex.Lock()
64+
defer ci.pendingUpdate.mutex.Unlock()
65+
ci.pendingUpdate.spec = &spec
66+
}
67+
68+
// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
69+
func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) {
70+
for {
71+
delay := time.Second * 2
72+
73+
// Is deployment in running state
74+
if ci.depl.status.State == api.DeploymentStateRunning {
75+
// Update cluster with our state
76+
ctx := context.Background()
77+
safeToAskCluster, err := ci.updateClusterServerCount(ctx)
78+
if err != nil {
79+
ci.log.Debug().Err(err).Msg("Cluster update failed")
80+
} else if safeToAskCluster {
81+
// Inspect once
82+
if err := ci.inspectCluster(ctx); err != nil {
83+
ci.log.Debug().Err(err).Msg("Cluster inspection failed")
84+
}
85+
}
86+
}
87+
88+
select {
89+
case <-time.After(delay):
90+
// Continue
91+
case <-stopCh:
92+
// We're done
93+
return
94+
}
95+
}
96+
}
97+
98+
// Perform a single inspection of the cluster
99+
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context) error {
100+
log := ci.log
101+
c, err := ci.depl.clientCache.GetDatabase(ctx)
102+
if err != nil {
103+
return maskAny(err)
104+
}
105+
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
106+
if err != nil {
107+
log.Debug().Err(err).Msg("Failed to get number of servers")
108+
return maskAny(err)
109+
}
110+
if req.Coordinators == nil && req.DBServers == nil {
111+
// Nothing to check
112+
return nil
113+
}
114+
coordinatorsChanged := false
115+
dbserversChanged := false
116+
ci.lastNumberOfServers.mutex.Lock()
117+
defer ci.lastNumberOfServers.mutex.Unlock()
118+
desired := ci.lastNumberOfServers.NumberOfServers
119+
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
120+
// #Coordinator has changed
121+
coordinatorsChanged = true
122+
}
123+
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
124+
// #DBServers has changed
125+
dbserversChanged = true
126+
}
127+
if !coordinatorsChanged && !dbserversChanged {
128+
// Nothing has changed
129+
return nil
130+
}
131+
// Let's update the spec
132+
apiObject := ci.depl.apiObject
133+
current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(apiObject.Namespace).Get(apiObject.Name, metav1.GetOptions{})
134+
if err != nil {
135+
log.Debug().Err(err).Msg("Failed to get current deployment")
136+
return maskAny(err)
137+
}
138+
if coordinatorsChanged {
139+
current.Spec.Coordinators.Count = util.NewInt(req.GetCoordinators())
140+
}
141+
if dbserversChanged {
142+
current.Spec.DBServers.Count = util.NewInt(req.GetDBServers())
143+
}
144+
if err := ci.depl.updateCRSpec(current.Spec); err != nil {
145+
log.Warn().Err(err).Msg("Failed to update current deployment")
146+
return maskAny(err)
147+
}
148+
return nil
149+
}
150+
151+
// updateClusterServerCount updates the intended number of servers of the cluster.
152+
// Returns true when it is safe to ask the cluster for updates.
153+
func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context) (bool, error) {
154+
// Any update needed?
155+
ci.pendingUpdate.mutex.Lock()
156+
spec := ci.pendingUpdate.spec
157+
ci.pendingUpdate.mutex.Unlock()
158+
if spec == nil {
159+
// Nothing pending
160+
return true, nil
161+
}
162+
163+
log := ci.log
164+
c, err := ci.depl.clientCache.GetDatabase(ctx)
165+
if err != nil {
166+
return false, maskAny(err)
167+
}
168+
coordinatorCount := spec.Coordinators.GetCount()
169+
dbserverCount := spec.DBServers.GetCount()
170+
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
171+
log.Debug().Err(err).Msg("Failed to set number of servers")
172+
return false, maskAny(err)
173+
}
174+
175+
// Success, now update internal state
176+
safeToAskCluster := false
177+
ci.pendingUpdate.mutex.Lock()
178+
if spec == ci.pendingUpdate.spec {
179+
ci.pendingUpdate.spec = nil
180+
safeToAskCluster = true
181+
}
182+
ci.pendingUpdate.mutex.Unlock()
183+
184+
ci.lastNumberOfServers.mutex.Lock()
185+
defer ci.lastNumberOfServers.mutex.Unlock()
186+
187+
ci.lastNumberOfServers.Coordinators = &coordinatorCount
188+
ci.lastNumberOfServers.DBServers = &dbserverCount
189+
return safeToAskCluster, nil
190+
}

pkg/deployment/cluster_updater.go

-50
This file was deleted.

0 commit comments

Comments
 (0)