Skip to content

Commit 2b86891

Browse files
authored
Merge pull request #172 from arangodb/bugfix/status-resilience
Avoid overwriting status changes
2 parents 16ada24 + a618a72 commit 2b86891

23 files changed

+323
-162
lines changed

pkg/apis/deployment/v1alpha/deployment_status_members.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,23 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
7575
// ForeachServerGroup calls the given callback for all server groups.
7676
// If the callback returns an error, this error is returned and the callback is
7777
// not called for the remaining groups.
78-
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list *MemberStatusList) error) error {
79-
if err := cb(ServerGroupSingle, &ds.Single); err != nil {
78+
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
79+
if err := cb(ServerGroupSingle, ds.Single); err != nil {
8080
return maskAny(err)
8181
}
82-
if err := cb(ServerGroupAgents, &ds.Agents); err != nil {
82+
if err := cb(ServerGroupAgents, ds.Agents); err != nil {
8383
return maskAny(err)
8484
}
85-
if err := cb(ServerGroupDBServers, &ds.DBServers); err != nil {
85+
if err := cb(ServerGroupDBServers, ds.DBServers); err != nil {
8686
return maskAny(err)
8787
}
88-
if err := cb(ServerGroupCoordinators, &ds.Coordinators); err != nil {
88+
if err := cb(ServerGroupCoordinators, ds.Coordinators); err != nil {
8989
return maskAny(err)
9090
}
91-
if err := cb(ServerGroupSyncMasters, &ds.SyncMasters); err != nil {
91+
if err := cb(ServerGroupSyncMasters, ds.SyncMasters); err != nil {
9292
return maskAny(err)
9393
}
94-
if err := cb(ServerGroupSyncWorkers, &ds.SyncWorkers); err != nil {
94+
if err := cb(ServerGroupSyncWorkers, ds.SyncWorkers); err != nil {
9595
return maskAny(err)
9696
}
9797
return nil
@@ -190,8 +190,8 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro
190190

191191
// AllMembersReady returns true when all members are in the Ready state.
192192
func (ds DeploymentStatusMembers) AllMembersReady() bool {
193-
if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error {
194-
for _, x := range *list {
193+
if err := ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
194+
for _, x := range list {
195195
if !x.Conditions.IsTrue(ConditionTypeReady) {
196196
return fmt.Errorf("not ready")
197197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package v1alpha
24+
25+
import (
26+
"testing"
27+
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
// TestMemberStatusList tests modifying a MemberStatusList.
32+
func TestMemberStatusList(t *testing.T) {
33+
list := &MemberStatusList{}
34+
m1 := MemberStatus{ID: "m1"}
35+
m2 := MemberStatus{ID: "m2"}
36+
m3 := MemberStatus{ID: "m3"}
37+
assert.Equal(t, 0, len(*list))
38+
39+
assert.NoError(t, list.Add(m1))
40+
assert.Equal(t, 1, len(*list))
41+
42+
assert.NoError(t, list.Add(m2))
43+
assert.NoError(t, list.Add(m3))
44+
assert.Equal(t, 3, len(*list))
45+
46+
assert.Error(t, list.Add(m2))
47+
assert.Equal(t, 3, len(*list))
48+
49+
assert.NoError(t, list.RemoveByID(m3.ID))
50+
assert.Equal(t, 2, len(*list))
51+
assert.False(t, list.ContainsID(m3.ID))
52+
assert.Equal(t, m1.ID, (*list)[0].ID)
53+
assert.Equal(t, m2.ID, (*list)[1].ID)
54+
55+
m2.PodName = "foo"
56+
assert.NoError(t, list.Update(m2))
57+
assert.Equal(t, 2, len(*list))
58+
assert.True(t, list.ContainsID(m2.ID))
59+
x, found := list.ElementByPodName("foo")
60+
assert.True(t, found)
61+
assert.Equal(t, "foo", x.PodName)
62+
assert.Equal(t, m2.ID, x.ID)
63+
64+
assert.NoError(t, list.Add(m3))
65+
assert.Equal(t, 3, len(*list))
66+
assert.Equal(t, m1.ID, (*list)[0].ID)
67+
assert.Equal(t, m2.ID, (*list)[1].ID)
68+
assert.Equal(t, m3.ID, (*list)[2].ID)
69+
}

pkg/deployment/cluster_scaling_integration.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
7171
delay := time.Second * 2
7272

7373
// Is deployment in running state
74-
if ci.depl.status.Phase == api.DeploymentPhaseRunning {
74+
if ci.depl.GetPhase() == api.DeploymentPhaseRunning {
7575
// Update cluster with our state
7676
ctx := context.Background()
7777
safeToAskCluster, err := ci.updateClusterServerCount(ctx)

pkg/deployment/context_impl.go

+33-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package deployment
2424

2525
import (
2626
"context"
27+
"fmt"
2728

2829
"github.com/arangodb/arangosync/client"
2930
"github.com/arangodb/arangosync/tasks"
@@ -63,20 +64,44 @@ func (d *Deployment) GetNamespace() string {
6364
return d.apiObject.GetNamespace()
6465
}
6566

67+
// GetPhase returns the current phase of the deployment
68+
func (d *Deployment) GetPhase() api.DeploymentPhase {
69+
return d.status.last.Phase
70+
}
71+
6672
// GetSpec returns the current specification
6773
func (d *Deployment) GetSpec() api.DeploymentSpec {
6874
return d.apiObject.Spec
6975
}
7076

7177
// GetStatus returns the current status of the deployment
72-
func (d *Deployment) GetStatus() api.DeploymentStatus {
73-
return d.status
78+
// together with the current version of that status.
79+
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
80+
d.status.mutex.Lock()
81+
defer d.status.mutex.Unlock()
82+
83+
version := d.status.version
84+
return *d.status.last.DeepCopy(), version
7485
}
7586

7687
// UpdateStatus replaces the status of the deployment with the given status and
7788
// updates the resources in k8s.
78-
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, force ...bool) error {
79-
d.status = status
89+
// If the given last version does not match the actual last version of the status object,
90+
// an error is returned.
91+
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, lastVersion int32, force ...bool) error {
92+
d.status.mutex.Lock()
93+
defer d.status.mutex.Unlock()
94+
95+
if d.status.version != lastVersion {
96+
// Status is obsolete
97+
d.deps.Log.Error().
98+
Int32("expected-version", lastVersion).
99+
Int32("actual-version", d.status.version).
100+
Msg("UpdateStatus version conflict error.")
101+
return maskAny(fmt.Errorf("Status conflict error. Expected version %d, got %d", lastVersion, d.status.version))
102+
}
103+
d.status.version++
104+
d.status.last = *status.DeepCopy()
80105
if err := d.updateCRStatus(force...); err != nil {
81106
return maskAny(err)
82107
}
@@ -105,7 +130,7 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup,
105130
// GetAgencyClients returns a client connection for every agency member.
106131
// If the given predicate is not nil, only agents are included where the given predicate returns true.
107132
func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
108-
agencyMembers := d.status.Members.Agents
133+
agencyMembers := d.status.last.Members.Agents
109134
result := make([]driver.Connection, 0, len(agencyMembers))
110135
for _, m := range agencyMembers {
111136
if predicate != nil && !predicate(m.ID) {
@@ -157,13 +182,14 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
157182
// If ID is non-empty, it will be used, otherwise a new ID is created.
158183
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
159184
log := d.deps.Log
160-
id, err := d.createMember(group, id, d.apiObject)
185+
status, lastVersion := d.GetStatus()
186+
id, err := createMember(log, &status, group, id, d.apiObject)
161187
if err != nil {
162188
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
163189
return maskAny(err)
164190
}
165191
// Save added member
166-
if err := d.updateCRStatus(); err != nil {
192+
if err := d.UpdateStatus(status, lastVersion); err != nil {
167193
log.Debug().Err(err).Msg("Updating CR status failed")
168194
return maskAny(err)
169195
}

pkg/deployment/deployment.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package deployment
2525
import (
2626
"fmt"
2727
"reflect"
28+
"sync"
2829
"sync/atomic"
2930
"time"
3031

@@ -83,9 +84,13 @@ const (
8384
// Deployment is the in process state of an ArangoDeployment.
8485
type Deployment struct {
8586
apiObject *api.ArangoDeployment // API object
86-
status api.DeploymentStatus // Internal status of the CR
87-
config Config
88-
deps Dependencies
87+
status struct {
88+
mutex sync.Mutex
89+
version int32
90+
last api.DeploymentStatus // Internal status copy of the CR
91+
}
92+
config Config
93+
deps Dependencies
8994

9095
eventCh chan *deploymentEvent
9196
stopCh chan struct{}
@@ -112,20 +117,20 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
112117
}
113118
d := &Deployment{
114119
apiObject: apiObject,
115-
status: *(apiObject.Status.DeepCopy()),
116120
config: config,
117121
deps: deps,
118122
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
119123
stopCh: make(chan struct{}),
120124
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
121125
clientCache: newClientCache(deps.KubeCli, apiObject),
122126
}
127+
d.status.last = *(apiObject.Status.DeepCopy())
123128
d.reconciler = reconcile.NewReconciler(deps.Log, d)
124129
d.resilience = resilience.NewResilience(deps.Log, d)
125130
d.resources = resources.NewResources(deps.Log, d)
126-
if d.status.AcceptedSpec == nil {
131+
if d.status.last.AcceptedSpec == nil {
127132
// We've validated the spec, so let's use it from now.
128-
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
133+
d.status.last.AcceptedSpec = apiObject.Spec.DeepCopy()
129134
}
130135

131136
go d.run()
@@ -185,7 +190,7 @@ func (d *Deployment) send(ev *deploymentEvent) {
185190
func (d *Deployment) run() {
186191
log := d.deps.Log
187192

188-
if d.status.Phase == api.DeploymentPhaseNone {
193+
if d.GetPhase() == api.DeploymentPhaseNone {
189194
// Create secrets
190195
if err := d.resources.EnsureSecrets(); err != nil {
191196
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create secrets", err, d.GetAPIObject()))
@@ -211,8 +216,9 @@ func (d *Deployment) run() {
211216
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create pods", err, d.GetAPIObject()))
212217
}
213218

214-
d.status.Phase = api.DeploymentPhaseRunning
215-
if err := d.updateCRStatus(); err != nil {
219+
status, lastVersion := d.GetStatus()
220+
status.Phase = api.DeploymentPhaseRunning
221+
if err := d.UpdateStatus(status, lastVersion); err != nil {
216222
log.Warn().Err(err).Msg("update initial CR status failed")
217223
}
218224
log.Info().Msg("start running...")
@@ -277,13 +283,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
277283
}
278284

279285
specBefore := d.apiObject.Spec
280-
if d.status.AcceptedSpec != nil {
281-
specBefore = *d.status.AcceptedSpec
286+
status := d.status.last
287+
if d.status.last.AcceptedSpec != nil {
288+
specBefore = *status.AcceptedSpec.DeepCopy()
282289
}
283290
newAPIObject := current.DeepCopy()
284291
newAPIObject.Spec.SetDefaultsFrom(specBefore)
285292
newAPIObject.Spec.SetDefaults(d.apiObject.GetName())
286-
newAPIObject.Status = d.status
293+
newAPIObject.Status = status
287294
resetFields := specBefore.ResetImmutableFields(&newAPIObject.Spec)
288295
if len(resetFields) > 0 {
289296
log.Debug().Strs("fields", resetFields).Msg("Found modified immutable fields")
@@ -309,9 +316,12 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
309316
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
310317
}
311318
// Save updated accepted spec
312-
d.status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
313-
if err := d.updateCRStatus(); err != nil {
314-
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
319+
{
320+
status, lastVersion := d.GetStatus()
321+
status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
322+
if err := d.UpdateStatus(status, lastVersion); err != nil {
323+
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
324+
}
315325
}
316326

317327
// Notify cluster of desired server count
@@ -351,7 +361,7 @@ func (d *Deployment) updateCRStatus(force ...bool) error {
351361
attempt := 0
352362
for {
353363
attempt++
354-
update.Status = d.status
364+
update.Status = d.status.last
355365
if update.GetDeletionTimestamp() == nil {
356366
ensureFinalizers(update)
357367
}
@@ -388,7 +398,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
388398
for {
389399
attempt++
390400
update.Spec = newSpec
391-
update.Status = d.status
401+
update.Status = d.status.last
392402
ns := d.apiObject.GetNamespace()
393403
newAPIObject, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(ns).Update(update)
394404
if err == nil {
@@ -417,7 +427,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
417427
// Since there is no recovery from a failed deployment, use with care!
418428
func (d *Deployment) failOnError(err error, msg string) {
419429
log.Error().Err(err).Msg(msg)
420-
d.status.Reason = err.Error()
430+
d.status.last.Reason = err.Error()
421431
d.reportFailedStatus()
422432
}
423433

@@ -428,7 +438,7 @@ func (d *Deployment) reportFailedStatus() {
428438
log.Info().Msg("deployment failed. Reporting failed reason...")
429439

430440
op := func() error {
431-
d.status.Phase = api.DeploymentPhaseFailed
441+
d.status.last.Phase = api.DeploymentPhaseFailed
432442
err := d.updateCRStatus()
433443
if err == nil || k8sutil.IsNotFound(err) {
434444
// Status has been updated

pkg/deployment/deployment_inspector.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
6060
}
6161
} else {
6262
// Is the deployment in failed state, if so, give up.
63-
if d.status.Phase == api.DeploymentPhaseFailed {
63+
if d.GetPhase() == api.DeploymentPhaseFailed {
6464
log.Debug().Msg("Deployment is in Failed state.")
6565
return nextInterval
6666
}
@@ -72,7 +72,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
7272
}
7373

7474
// Is the deployment in a good state?
75-
if d.status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
75+
status, _ := d.GetStatus()
76+
if status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
7677
log.Debug().Msg("Condition SecretsChanged is true. Revert secrets before we can continue")
7778
return nextInterval
7879
}

pkg/deployment/images.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ type imagesBuilder struct {
5555
// ensureImages creates pods needed to detect ImageID for specified images.
5656
// Returns: retrySoon, error
5757
func (d *Deployment) ensureImages(apiObject *api.ArangoDeployment) (bool, error) {
58+
status, lastVersion := d.GetStatus()
5859
ib := imagesBuilder{
5960
APIObject: apiObject,
6061
Spec: apiObject.Spec,
61-
Status: d.status,
62+
Status: status,
6263
Log: d.deps.Log,
6364
KubeCli: d.deps.KubeCli,
6465
UpdateCRStatus: func(status api.DeploymentStatus) error {
65-
d.status = status
66-
if err := d.updateCRStatus(); err != nil {
66+
if err := d.UpdateStatus(status, lastVersion); err != nil {
6767
return maskAny(err)
6868
}
6969
return nil

0 commit comments

Comments
 (0)