Skip to content

Avoid overwriting status changes #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/apis/deployment/v1alpha/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,23 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and the callback is
// not called for the remaining groups.
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list *MemberStatusList) error) error {
if err := cb(ServerGroupSingle, &ds.Single); err != nil {
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
if err := cb(ServerGroupSingle, ds.Single); err != nil {
return maskAny(err)
}
if err := cb(ServerGroupAgents, &ds.Agents); err != nil {
if err := cb(ServerGroupAgents, ds.Agents); err != nil {
return maskAny(err)
}
if err := cb(ServerGroupDBServers, &ds.DBServers); err != nil {
if err := cb(ServerGroupDBServers, ds.DBServers); err != nil {
return maskAny(err)
}
if err := cb(ServerGroupCoordinators, &ds.Coordinators); err != nil {
if err := cb(ServerGroupCoordinators, ds.Coordinators); err != nil {
return maskAny(err)
}
if err := cb(ServerGroupSyncMasters, &ds.SyncMasters); err != nil {
if err := cb(ServerGroupSyncMasters, ds.SyncMasters); err != nil {
return maskAny(err)
}
if err := cb(ServerGroupSyncWorkers, &ds.SyncWorkers); err != nil {
if err := cb(ServerGroupSyncWorkers, ds.SyncWorkers); err != nil {
return maskAny(err)
}
return nil
Expand Down Expand Up @@ -190,8 +190,8 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro

// AllMembersReady returns true when all members are in the Ready state.
func (ds DeploymentStatusMembers) AllMembersReady() bool {
if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error {
for _, x := range *list {
if err := ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, x := range list {
if !x.Conditions.IsTrue(ConditionTypeReady) {
return fmt.Errorf("not ready")
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/apis/deployment/v1alpha/member_status_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package v1alpha

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestMemberStatusList tests modifying a MemberStatusList.
func TestMemberStatusList(t *testing.T) {
list := &MemberStatusList{}
m1 := MemberStatus{ID: "m1"}
m2 := MemberStatus{ID: "m2"}
m3 := MemberStatus{ID: "m3"}
assert.Equal(t, 0, len(*list))

assert.NoError(t, list.Add(m1))
assert.Equal(t, 1, len(*list))

assert.NoError(t, list.Add(m2))
assert.NoError(t, list.Add(m3))
assert.Equal(t, 3, len(*list))

assert.Error(t, list.Add(m2))
assert.Equal(t, 3, len(*list))

assert.NoError(t, list.RemoveByID(m3.ID))
assert.Equal(t, 2, len(*list))
assert.False(t, list.ContainsID(m3.ID))
assert.Equal(t, m1.ID, (*list)[0].ID)
assert.Equal(t, m2.ID, (*list)[1].ID)

m2.PodName = "foo"
assert.NoError(t, list.Update(m2))
assert.Equal(t, 2, len(*list))
assert.True(t, list.ContainsID(m2.ID))
x, found := list.ElementByPodName("foo")
assert.True(t, found)
assert.Equal(t, "foo", x.PodName)
assert.Equal(t, m2.ID, x.ID)

assert.NoError(t, list.Add(m3))
assert.Equal(t, 3, len(*list))
assert.Equal(t, m1.ID, (*list)[0].ID)
assert.Equal(t, m2.ID, (*list)[1].ID)
assert.Equal(t, m3.ID, (*list)[2].ID)
}
2 changes: 1 addition & 1 deletion pkg/deployment/cluster_scaling_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
delay := time.Second * 2

// Is deployment in running state
if ci.depl.status.Phase == api.DeploymentPhaseRunning {
if ci.depl.GetPhase() == api.DeploymentPhaseRunning {
// Update cluster with our state
ctx := context.Background()
safeToAskCluster, err := ci.updateClusterServerCount(ctx)
Expand Down
40 changes: 33 additions & 7 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package deployment

import (
"context"
"fmt"

"github.com/arangodb/arangosync/client"
"github.com/arangodb/arangosync/tasks"
Expand Down Expand Up @@ -63,20 +64,44 @@ func (d *Deployment) GetNamespace() string {
return d.apiObject.GetNamespace()
}

// GetPhase returns the current phase of the deployment
func (d *Deployment) GetPhase() api.DeploymentPhase {
return d.status.last.Phase
}

// GetSpec returns the current specification
func (d *Deployment) GetSpec() api.DeploymentSpec {
return d.apiObject.Spec
}

// GetStatus returns the current status of the deployment
func (d *Deployment) GetStatus() api.DeploymentStatus {
return d.status
// together with the current version of that status.
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
d.status.mutex.Lock()
defer d.status.mutex.Unlock()

version := d.status.version
return *d.status.last.DeepCopy(), version
}

// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, force ...bool) error {
d.status = status
// If the given last version does not match the actual last version of the status object,
// an error is returned.
func (d *Deployment) UpdateStatus(status api.DeploymentStatus, lastVersion int32, force ...bool) error {
d.status.mutex.Lock()
defer d.status.mutex.Unlock()

if d.status.version != lastVersion {
// Status is obsolete
d.deps.Log.Error().
Int32("expected-version", lastVersion).
Int32("actual-version", d.status.version).
Msg("UpdateStatus version conflict error.")
return maskAny(fmt.Errorf("Status conflict error. Expected version %d, got %d", lastVersion, d.status.version))
}
d.status.version++
d.status.last = *status.DeepCopy()
if err := d.updateCRStatus(force...); err != nil {
return maskAny(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails because of network failure, then the version is already incremented. Any retry will now fail with a version mismatch. If we do not retry, we have a changed in memory state which is not written to the API service. Maybe we have to rollback the change in memory to allow for a retry?
The current behaviour might or might not be what we want. If we simply move on then the in memory state is different from the one in the API server. I cannot judge the consequences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a retry higher up, it MUST call GetStatus() first to get the latest status and version.
So while a network error will cause a state diff between in-memory & persistent in kube-apiserver, this will not affect future updates.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
Expand Down Expand Up @@ -105,7 +130,7 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup,
// GetAgencyClients returns a client connection for every agency member.
// If the given predicate is not nil, only agents are included where the given predicate returns true.
func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
agencyMembers := d.status.Members.Agents
agencyMembers := d.status.last.Members.Agents
result := make([]driver.Connection, 0, len(agencyMembers))
for _, m := range agencyMembers {
if predicate != nil && !predicate(m.ID) {
Expand Down Expand Up @@ -157,13 +182,14 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
log := d.deps.Log
id, err := d.createMember(group, id, d.apiObject)
status, lastVersion := d.GetStatus()
id, err := createMember(log, &status, group, id, d.apiObject)
if err != nil {
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
return maskAny(err)
}
// Save added member
if err := d.updateCRStatus(); err != nil {
if err := d.UpdateStatus(status, lastVersion); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
}
Expand Down
48 changes: 29 additions & 19 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package deployment
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -83,9 +84,13 @@ const (
// Deployment is the in process state of an ArangoDeployment.
type Deployment struct {
apiObject *api.ArangoDeployment // API object
status api.DeploymentStatus // Internal status of the CR
config Config
deps Dependencies
status struct {
mutex sync.Mutex
version int32
last api.DeploymentStatus // Internal status copy of the CR
}
config Config
deps Dependencies

eventCh chan *deploymentEvent
stopCh chan struct{}
Expand All @@ -112,20 +117,20 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
}
d := &Deployment{
apiObject: apiObject,
status: *(apiObject.Status.DeepCopy()),
config: config,
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
clientCache: newClientCache(deps.KubeCli, apiObject),
}
d.status.last = *(apiObject.Status.DeepCopy())
d.reconciler = reconcile.NewReconciler(deps.Log, d)
d.resilience = resilience.NewResilience(deps.Log, d)
d.resources = resources.NewResources(deps.Log, d)
if d.status.AcceptedSpec == nil {
if d.status.last.AcceptedSpec == nil {
// We've validated the spec, so let's use it from now.
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
d.status.last.AcceptedSpec = apiObject.Spec.DeepCopy()
}

go d.run()
Expand Down Expand Up @@ -185,7 +190,7 @@ func (d *Deployment) send(ev *deploymentEvent) {
func (d *Deployment) run() {
log := d.deps.Log

if d.status.Phase == api.DeploymentPhaseNone {
if d.GetPhase() == api.DeploymentPhaseNone {
// Create secrets
if err := d.resources.EnsureSecrets(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create secrets", err, d.GetAPIObject()))
Expand All @@ -211,8 +216,9 @@ func (d *Deployment) run() {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create pods", err, d.GetAPIObject()))
}

d.status.Phase = api.DeploymentPhaseRunning
if err := d.updateCRStatus(); err != nil {
status, lastVersion := d.GetStatus()
status.Phase = api.DeploymentPhaseRunning
if err := d.UpdateStatus(status, lastVersion); err != nil {
log.Warn().Err(err).Msg("update initial CR status failed")
}
log.Info().Msg("start running...")
Expand Down Expand Up @@ -277,13 +283,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
}

specBefore := d.apiObject.Spec
if d.status.AcceptedSpec != nil {
specBefore = *d.status.AcceptedSpec
status := d.status.last
if d.status.last.AcceptedSpec != nil {
specBefore = *status.AcceptedSpec.DeepCopy()
}
newAPIObject := current.DeepCopy()
newAPIObject.Spec.SetDefaultsFrom(specBefore)
newAPIObject.Spec.SetDefaults(d.apiObject.GetName())
newAPIObject.Status = d.status
newAPIObject.Status = status
resetFields := specBefore.ResetImmutableFields(&newAPIObject.Spec)
if len(resetFields) > 0 {
log.Debug().Strs("fields", resetFields).Msg("Found modified immutable fields")
Expand All @@ -309,9 +316,12 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
}
// Save updated accepted spec
d.status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
if err := d.updateCRStatus(); err != nil {
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
{
status, lastVersion := d.GetStatus()
status.AcceptedSpec = newAPIObject.Spec.DeepCopy()
if err := d.UpdateStatus(status, lastVersion); err != nil {
return maskAny(fmt.Errorf("failed to update ArangoDeployment status: %v", err))
}
}

// Notify cluster of desired server count
Expand Down Expand Up @@ -351,7 +361,7 @@ func (d *Deployment) updateCRStatus(force ...bool) error {
attempt := 0
for {
attempt++
update.Status = d.status
update.Status = d.status.last
if update.GetDeletionTimestamp() == nil {
ensureFinalizers(update)
}
Expand Down Expand Up @@ -388,7 +398,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
for {
attempt++
update.Spec = newSpec
update.Status = d.status
update.Status = d.status.last
ns := d.apiObject.GetNamespace()
newAPIObject, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(ns).Update(update)
if err == nil {
Expand Down Expand Up @@ -417,7 +427,7 @@ func (d *Deployment) updateCRSpec(newSpec api.DeploymentSpec) error {
// Since there is no recovery from a failed deployment, use with care!
func (d *Deployment) failOnError(err error, msg string) {
log.Error().Err(err).Msg(msg)
d.status.Reason = err.Error()
d.status.last.Reason = err.Error()
d.reportFailedStatus()
}

Expand All @@ -428,7 +438,7 @@ func (d *Deployment) reportFailedStatus() {
log.Info().Msg("deployment failed. Reporting failed reason...")

op := func() error {
d.status.Phase = api.DeploymentPhaseFailed
d.status.last.Phase = api.DeploymentPhaseFailed
err := d.updateCRStatus()
if err == nil || k8sutil.IsNotFound(err) {
// Status has been updated
Expand Down
5 changes: 3 additions & 2 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}
} else {
// Is the deployment in failed state, if so, give up.
if d.status.Phase == api.DeploymentPhaseFailed {
if d.GetPhase() == api.DeploymentPhaseFailed {
log.Debug().Msg("Deployment is in Failed state.")
return nextInterval
}
Expand All @@ -72,7 +72,8 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// Is the deployment in a good state?
if d.status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
status, _ := d.GetStatus()
if status.Conditions.IsTrue(api.ConditionTypeSecretsChanged) {
log.Debug().Msg("Condition SecretsChanged is true. Revert secrets before we can continue")
return nextInterval
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/deployment/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ type imagesBuilder struct {
// ensureImages creates pods needed to detect ImageID for specified images.
// Returns: retrySoon, error
func (d *Deployment) ensureImages(apiObject *api.ArangoDeployment) (bool, error) {
status, lastVersion := d.GetStatus()
ib := imagesBuilder{
APIObject: apiObject,
Spec: apiObject.Spec,
Status: d.status,
Status: status,
Log: d.deps.Log,
KubeCli: d.deps.KubeCli,
UpdateCRStatus: func(status api.DeploymentStatus) error {
d.status = status
if err := d.updateCRStatus(); err != nil {
if err := d.UpdateStatus(status, lastVersion); err != nil {
return maskAny(err)
}
return nil
Expand Down
Loading