Skip to content

Resilience improvements #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,056 changes: 1,056 additions & 0 deletions examples/metrics/dashboard.json

Large diffs are not rendered by default.

24 changes: 6 additions & 18 deletions examples/metrics/deployment-operator-servicemonitor.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
# This example shows how to integrate with the Prometheus Operator
# to bring metrics from kube-arangodb to Prometheus.

apiVersion: v1
kind: Service
metadata:
name: arango-deployment-operator
labels:
app: arango-deployment-operator
spec:
selector:
app: arango-deployment-operator
ports:
- name: metrics
port: 8528

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: arango-deployment-operator
namespace: monitoring
labels:
team: frontend
prometheus: kube-prometheus
spec:
selector:
matchLabels:
app: arango-deployment-operator
namespaceSelector:
matchNames:
- default
endpoints:
- port: metrics
- port: server
scheme: https
tlsConfig:
insecureSkipVerify: true

2 changes: 2 additions & 0 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,6 @@ func cmdLifecycleCopyRun(cmd *cobra.Command, args []string) {
if err := os.Chmod(targetPath, 0755); err != nil {
cliLog.Fatal().Err(err).Msg("Failed to chmod")
}

cliLog.Info().Msgf("Executable copied to %s", targetPath)
}
4 changes: 4 additions & 0 deletions pkg/apis/deployment/v1alpha/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
// ConditionTypeCleanedOut indicates that the member (dbserver) has been cleaned out.
// Always check in combination with ConditionTypeTerminated.
ConditionTypeCleanedOut ConditionType = "CleanedOut"
// ConditionTypeAgentRecoveryNeeded indicates that the member (agent) will no
// longer recover from its current volume and there has to be rebuild
// using the recovery procedure.
ConditionTypeAgentRecoveryNeeded ConditionType = "AgentRecoveryNeeded"
// ConditionTypePodSchedulingFailure indicates that one or more pods belonging to the deployment cannot be schedule.
ConditionTypePodSchedulingFailure ConditionType = "PodSchedulingFailure"
// ConditionTypeSecretsChanged indicates that the value of one of more secrets used by
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/deployment/v1alpha/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package v1alpha

import (
"math/rand"
"sort"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -83,7 +84,9 @@ func (l *MemberStatusList) add(m MemberStatus) error {
return maskAny(errors.Wrapf(AlreadyExistsError, "Member '%s' already exists", m.ID))
}
}
*l = append(src, m)
newList := append(src, m)
sort.Slice(newList, func(i, j int) bool { return newList[i].ID < newList[j].ID })
*l = newList
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/access_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ func (d *Deployment) ensureAccessPackage(apSecretName string) error {

// Fetch client authentication CA
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), clientAuthSecretName, ns, nil)
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(secrets, clientAuthSecretName, nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
return maskAny(err)
}

// Fetch TLS CA public key
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
tlsCACert, err := k8sutil.GetCACertficateSecret(d.deps.KubeCli.CoreV1(), tlsCASecretName, ns)
tlsCACert, err := k8sutil.GetCACertficateSecret(secrets, tlsCASecretName)
if err != nil {
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
return maskAny(err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
log := d.deps.Log
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
secrets := kubecli.CoreV1().Secrets(ns)
secretName := d.apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
monitoringToken, err := k8sutil.GetTokenSecret(kubecli.CoreV1(), secretName, ns)
monitoringToken, err := k8sutil.GetTokenSecret(secrets, secretName)
if err != nil {
log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync monitoring secret")
return nil, maskAny(err)
Expand Down Expand Up @@ -331,7 +332,8 @@ func (d *Deployment) GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error) {
func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error) {
secretName := k8sutil.CreateTLSKeyfileSecretName(d.apiObject.GetName(), group.AsRole(), member.ID)
ns := d.apiObject.GetNamespace()
result, err := k8sutil.GetTLSKeyfileSecret(d.deps.KubeCli.CoreV1(), secretName, ns)
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
result, err := k8sutil.GetTLSKeyfileSecret(secrets, secretName)
if err != nil {
return "", maskAny(err)
}
Expand All @@ -353,8 +355,9 @@ func (d *Deployment) DeleteTLSKeyfile(group api.ServerGroup, member api.MemberSt
// Returns: publicKey, privateKey, ownerByDeployment, error
func (d *Deployment) GetTLSCA(secretName string) (string, string, bool, error) {
ns := d.apiObject.GetNamespace()
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
owner := d.apiObject.AsOwner()
cert, priv, isOwned, err := k8sutil.GetCASecret(d.deps.KubeCli.CoreV1(), secretName, ns, &owner)
cert, priv, isOwned, err := k8sutil.GetCASecret(secrets, secretName, &owner)
if err != nil {
return "", "", false, maskAny(err)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
Expand Down Expand Up @@ -78,8 +79,8 @@ type deploymentEvent struct {

const (
deploymentEventQueueSize = 256
minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval
minInspectionInterval = util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = util.Interval(time.Minute) // Ensure we inspect the generated resources no less than with this interval
)

// Deployment is the in process state of an ArangoDeployment.
Expand Down Expand Up @@ -140,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
go ci.ListenForClusterEvents(d.stopCh)
go d.resources.RunDeploymentHealthLoop(d.stopCh)
}
if config.AllowChaos {
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
Expand Down Expand Up @@ -247,21 +249,21 @@ func (d *Deployment) run() {
}

case <-d.inspectTrigger.Done():
log.Debug().Msg("Inspect deployment...")
inspectionInterval = d.inspectDeployment(inspectionInterval)
log.Debug().Str("interval", inspectionInterval.String()).Msg("...inspected deployment")

case <-d.updateDeploymentTrigger.Done():
inspectionInterval = minInspectionInterval
if err := d.handleArangoDeploymentUpdatedEvent(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to handle deployment update", err, d.GetAPIObject()))
}

case <-time.After(inspectionInterval):
case <-inspectionInterval.After():
// Trigger inspection
d.inspectTrigger.Trigger()
// Backoff with next interval
inspectionInterval = time.Duration(float64(inspectionInterval) * 1.5)
if inspectionInterval > maxInspectionInterval {
inspectionInterval = maxInspectionInterval
}
inspectionInterval = inspectionInterval.Backoff(1.5, maxInspectionInterval)
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,35 @@ import (
"time"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
inspectDeploymentDurationGauges = metrics.MustRegisterGaugeVec(metricsComponent, "inspect_deployment_duration", "Amount of time taken by a single inspection of a deployment (in sec)", metrics.DeploymentName)
)

// inspectDeployment inspects the entire deployment, creates
// a plan to update if needed and inspects underlying resources.
// This function should be called when:
// - the deployment has changed
// - any of the underlying resources has changed
// - once in a while
// Returns the delay until this function should be called again.
func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration {
func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval {
log := d.deps.Log
start := time.Now()

nextInterval := lastInterval
hasError := false
ctx := context.Background()
deploymentName := d.apiObject.GetName()
defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start)

// Check deployment still exists
updated, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(d.apiObject.GetName(), metav1.GetOptions{})
updated, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.GetNamespace()).Get(deploymentName, metav1.GetOptions{})
if k8sutil.IsNotFound(err) {
// Deployment is gone
log.Info().Msg("Deployment is gone")
Expand Down Expand Up @@ -87,13 +96,17 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// Inspection of generated resources needed
if err := d.resources.InspectPods(ctx); err != nil {
if x, err := d.resources.InspectPods(ctx); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}
if err := d.resources.InspectPVCs(ctx); err != nil {
if x, err := d.resources.InspectPVCs(ctx); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("PVC inspection failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}

// Check members for resilience
Expand Down Expand Up @@ -149,9 +162,11 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// At the end of the inspect, we cleanup terminated pods.
if err := d.resources.CleanupTerminatedPods(); err != nil {
if x, err := d.resources.CleanupTerminatedPods(); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod cleanup failed", err, d.apiObject))
} else {
nextInterval = nextInterval.ReduceTo(x)
}
}

Expand All @@ -164,10 +179,7 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
} else {
d.recentInspectionErrors = 0
}
if nextInterval > maxInspectionInterval {
nextInterval = maxInspectionInterval
}
return nextInterval
return nextInterval.ReduceTo(maxInspectionInterval)
}

// triggerInspection ensures that an inspection is run soon.
Expand Down
28 changes: 28 additions & 0 deletions pkg/deployment/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

const (
// Component name for metrics of this package
metricsComponent = "deployment"
)
15 changes: 14 additions & 1 deletion pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase == api.MemberPhaseFailed && len(plan) == 0 {
log.Debug().
Str("id", m.ID).
Str("role", group.AsRole()).
Msg("Creating member replacement plan because member has failed")
newID := ""
if group == api.ServerGroupAgents {
newID = m.ID // Agents cannot (yet) be replaced with new IDs
Expand All @@ -117,6 +121,10 @@ func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
// Check for cleaned out dbserver in created state
for _, m := range status.Members.DBServers {
if len(plan) == 0 && m.Phase == api.MemberPhaseCreated && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
Expand Down Expand Up @@ -398,13 +406,18 @@ func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
// member.
func createUpgradeMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string, imageName string, status api.DeploymentStatus) api.Plan {
upgradeAction := api.ActionTypeUpgradeMember
if group.IsStateless() {
upgradeAction = api.ActionTypeRotateMember
}
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Str("reason", reason).
Str("action", string(upgradeAction)).
Msg("Creating upgrade plan")
plan := api.Plan{
api.NewAction(api.ActionTypeUpgradeMember, group, member.ID, reason),
api.NewAction(upgradeAction, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
}
if status.CurrentImage == nil || status.CurrentImage.Image != imageName {
Expand Down
10 changes: 5 additions & 5 deletions pkg/deployment/resources/certificates_client_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

// createClientAuthCACertificate creates a client authentication CA certificate and stores it in a secret with name
// specified in the given spec.
func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api.SyncAuthenticationSpec, deploymentName, namespace string, ownerRef *metav1.OwnerReference) error {
func createClientAuthCACertificate(log zerolog.Logger, secrets k8sutil.SecretInterface, spec api.SyncAuthenticationSpec, deploymentName string, ownerRef *metav1.OwnerReference) error {
log = log.With().Str("secret", spec.GetClientCASecretName()).Logger()
options := certificates.CreateCertificateOptions{
CommonName: fmt.Sprintf("%s Client Authentication Root Certificate", deploymentName),
Expand All @@ -57,7 +57,7 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s
log.Debug().Err(err).Msg("Failed to create CA certificate")
return maskAny(err)
}
if err := k8sutil.CreateCASecret(cli, spec.GetClientCASecretName(), namespace, cert, priv, ownerRef); err != nil {
if err := k8sutil.CreateCASecret(secrets, spec.GetClientCASecretName(), cert, priv, ownerRef); err != nil {
if k8sutil.IsAlreadyExists(err) {
log.Debug().Msg("CA Secret already exists")
} else {
Expand All @@ -71,10 +71,10 @@ func createClientAuthCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, s

// createClientAuthCertificateKeyfile creates a client authentication certificate for a specific user and stores
// it in a secret with the given name.
func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName, namespace string, ownerRef *metav1.OwnerReference) error {
func createClientAuthCertificateKeyfile(log zerolog.Logger, secrets v1.SecretInterface, commonName string, ttl time.Duration, spec api.SyncAuthenticationSpec, secretName string, ownerRef *metav1.OwnerReference) error {
log = log.With().Str("secret", secretName).Logger()
// Load CA certificate
caCert, caKey, _, err := k8sutil.GetCASecret(cli, spec.GetClientCASecretName(), namespace, nil)
caCert, caKey, _, err := k8sutil.GetCASecret(secrets, spec.GetClientCASecretName(), nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to load CA certificate")
return maskAny(err)
Expand All @@ -100,7 +100,7 @@ func createClientAuthCertificateKeyfile(log zerolog.Logger, cli v1.CoreV1Interfa
}
keyfile := strings.TrimSpace(cert) + "\n" +
strings.TrimSpace(priv)
if err := k8sutil.CreateTLSKeyfileSecret(cli, secretName, namespace, keyfile, ownerRef); err != nil {
if err := k8sutil.CreateTLSKeyfileSecret(secrets, secretName, keyfile, ownerRef); err != nil {
if k8sutil.IsAlreadyExists(err) {
log.Debug().Msg("Server Secret already exists")
} else {
Expand Down
Loading