Skip to content

Commit cf338bd

Browse files
authored
Merge pull request #201 from arangodb/bugfix/operator-ready-state
All operator Pods will now reach the Ready state.
2 parents a924758 + b415c7c commit cf338bd

File tree

7 files changed

+147
-11
lines changed

7 files changed

+147
-11
lines changed

main.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ import (
3636
"github.com/rs/zerolog"
3737
"github.com/spf13/cobra"
3838
flag "github.com/spf13/pflag"
39+
appsv1beta2 "k8s.io/api/apps/v1beta2"
3940
"k8s.io/api/core/v1"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42+
"k8s.io/apimachinery/pkg/runtime"
4143
"k8s.io/client-go/kubernetes"
4244
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
4345
"k8s.io/client-go/tools/record"
@@ -271,5 +273,9 @@ func createRecorder(log zerolog.Logger, kubecli kubernetes.Interface, name, name
271273
log.Info().Msgf(format, args...)
272274
})
273275
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events(namespace)})
274-
return eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
276+
combinedScheme := runtime.NewScheme()
277+
scheme.AddToScheme(combinedScheme)
278+
v1.AddToScheme(combinedScheme)
279+
appsv1beta2.AddToScheme(combinedScheme)
280+
return eventBroadcaster.NewRecorder(combinedScheme, v1.EventSource{Component: name})
275281
}

manifests/templates/deployment-replication/rbac.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ rules:
3333
resources: ["nodes"]
3434
verbs: ["get"]
3535
- apiGroups: ["apps"]
36-
resources: ["deployments"]
37-
verbs: ["*"]
36+
resources: ["deployments", "replicasets"]
37+
verbs: ["get"]
3838

3939
---
4040

manifests/templates/deployment/rbac.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ rules:
3030
resources: ["nodes"]
3131
verbs: ["get"]
3232
- apiGroups: ["apps"]
33-
resources: ["deployments"]
34-
verbs: ["*"]
33+
resources: ["deployments", "replicasets"]
34+
verbs: ["get"]
3535
- apiGroups: ["storage.k8s.io"]
3636
resources: ["storageclasses"]
3737
verbs: ["get", "list"]

manifests/templates/storage/rbac.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ rules:
3333
- apiGroups: ["apps"]
3434
resources: ["daemonsets"]
3535
verbs: ["*"]
36+
- apiGroups: ["apps"]
37+
resources: ["deployments", "replicasets"]
38+
verbs: ["get"]
3639
- apiGroups: ["storage.k8s.io"]
3740
resources: ["storageclasses"]
3841
verbs: ["*"]

pkg/operator/operator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
105105
// Run the operator
106106
func (o *Operator) Run() {
107107
if o.Config.EnableDeployment {
108-
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment)
108+
go o.runLeaderElection("arango-deployment-operator", o.onStartDeployment, o.Dependencies.DeploymentProbe)
109109
}
110110
if o.Config.EnableDeploymentReplication {
111-
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication)
111+
go o.runLeaderElection("arango-deployment-replication-operator", o.onStartDeploymentReplication, o.Dependencies.DeploymentReplicationProbe)
112112
}
113113
if o.Config.EnableStorage {
114-
go o.runLeaderElection("arango-storage-operator", o.onStartStorage)
114+
go o.runLeaderElection("arango-storage-operator", o.onStartStorage, o.Dependencies.StorageProbe)
115115
}
116116
// Wait until process terminates
117117
<-context.TODO().Done()

pkg/operator/operator_leader.go

+68-3
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,37 @@
2323
package operator
2424

2525
import (
26+
"fmt"
27+
"os"
2628
"time"
2729

30+
"github.com/rs/zerolog"
31+
"k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/runtime"
2834
"k8s.io/client-go/tools/leaderelection"
2935
"k8s.io/client-go/tools/leaderelection/resourcelock"
36+
37+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
38+
"github.com/arangodb/kube-arangodb/pkg/util/probe"
3039
)
3140

32-
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
41+
// runLeaderElection performs a leader election on a lock with given name in
42+
// the namespace that the operator is deployed in.
43+
// When the leader election is won, the given callback is called.
44+
// When the leader election is was won once, but then the leadership is lost, the process is killed.
45+
// The given ready probe is set, as soon as this process became the leader, or a new leader
46+
// is detected.
47+
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) {
3348
namespace := o.Config.Namespace
3449
kubecli := o.Dependencies.KubeCli
3550
log := o.log.With().Str("lock-name", lockName).Logger()
51+
eventTarget := o.getLeaderElectionEventTarget(log)
52+
recordEvent := func(reason, message string) {
53+
if eventTarget != nil {
54+
o.Dependencies.EventRecorder.Event(eventTarget, v1.EventTypeNormal, reason, message)
55+
}
56+
}
3657
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
3758
namespace,
3859
lockName,
@@ -51,10 +72,54 @@ func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan s
5172
RenewDeadline: 10 * time.Second,
5273
RetryPeriod: 2 * time.Second,
5374
Callbacks: leaderelection.LeaderCallbacks{
54-
OnStartedLeading: onStart,
75+
OnStartedLeading: func(stop <-chan struct{}) {
76+
recordEvent("Leader Election Won", fmt.Sprintf("Pod %s is running as leader", o.Config.PodName))
77+
readyProbe.SetReady()
78+
onStart(stop)
79+
},
5580
OnStoppedLeading: func() {
56-
log.Info().Msg("Leader election lost")
81+
recordEvent("Stop Leading", fmt.Sprintf("Pod %s is stopping to run as leader", o.Config.PodName))
82+
log.Info().Msg("Stop leading. Terminating process")
83+
os.Exit(1)
84+
},
85+
OnNewLeader: func(identity string) {
86+
log.Info().Str("identity", identity).Msg("New leader detected")
87+
readyProbe.SetReady()
5788
},
5889
},
5990
})
6091
}
92+
93+
// getLeaderElectionEventTarget returns the object that leader election related
94+
// events will be added to.
95+
func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object {
96+
ns := o.Config.Namespace
97+
kubecli := o.Dependencies.KubeCli
98+
pods := kubecli.CoreV1().Pods(ns)
99+
log = log.With().Str("pod-name", o.Config.PodName).Logger()
100+
pod, err := pods.Get(o.Config.PodName, metav1.GetOptions{})
101+
if err != nil {
102+
log.Error().Err(err).Msg("Cannot find Pod containing this operator")
103+
return nil
104+
}
105+
rSet, err := k8sutil.GetPodOwner(kubecli, pod, ns)
106+
if err != nil {
107+
log.Error().Err(err).Msg("Cannot find ReplicaSet owning the Pod containing this operator")
108+
return pod
109+
}
110+
if rSet == nil {
111+
log.Error().Msg("Pod containing this operator has no ReplicaSet owner")
112+
return pod
113+
}
114+
log = log.With().Str("replicaSet-name", rSet.Name).Logger()
115+
depl, err := k8sutil.GetReplicaSetOwner(kubecli, rSet, ns)
116+
if err != nil {
117+
log.Error().Err(err).Msg("Cannot find Deployment owning the ReplicataSet that owns the Pod containing this operator")
118+
return rSet
119+
}
120+
if rSet == nil {
121+
log.Error().Msg("ReplicaSet that owns the Pod containing this operator has no Deployment owner")
122+
return rSet
123+
}
124+
return depl
125+
}

pkg/util/k8sutil/owner.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package k8sutil
24+
25+
import (
26+
"k8s.io/api/apps/v1beta2"
27+
"k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/client-go/kubernetes"
30+
)
31+
32+
// GetPodOwner returns the ReplicaSet that owns the given Pod.
33+
// If the Pod has no owner of the owner is not a ReplicaSet, nil is returned.
34+
func GetPodOwner(kubecli kubernetes.Interface, pod *v1.Pod, ns string) (*v1beta2.ReplicaSet, error) {
35+
for _, ref := range pod.GetOwnerReferences() {
36+
if ref.Kind == "ReplicaSet" {
37+
rSets := kubecli.AppsV1beta2().ReplicaSets(pod.GetNamespace())
38+
rSet, err := rSets.Get(ref.Name, metav1.GetOptions{})
39+
if err != nil {
40+
return nil, maskAny(err)
41+
}
42+
return rSet, nil
43+
}
44+
}
45+
return nil, nil
46+
}
47+
48+
// GetReplicaSetOwner returns the Deployment that owns the given ReplicaSet.
49+
// If the ReplicaSet has no owner of the owner is not a Deployment, nil is returned.
50+
func GetReplicaSetOwner(kubecli kubernetes.Interface, rSet *v1beta2.ReplicaSet, ns string) (*v1beta2.Deployment, error) {
51+
for _, ref := range rSet.GetOwnerReferences() {
52+
if ref.Kind == "Deployment" {
53+
depls := kubecli.AppsV1beta2().Deployments(rSet.GetNamespace())
54+
depl, err := depls.Get(ref.Name, metav1.GetOptions{})
55+
if err != nil {
56+
return nil, maskAny(err)
57+
}
58+
return depl, nil
59+
}
60+
}
61+
return nil, nil
62+
}

0 commit comments

Comments
 (0)