Skip to content

Commit e7a508f

Browse files
authored
Merge pull request #110 from arangodb/safe-resource-watcher
Safe resource watcher
2 parents 92416ac + c08dc0b commit e7a508f

File tree

6 files changed

+268
-175
lines changed

6 files changed

+268
-175
lines changed

pkg/deployment/informers.go

+103-102
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,13 @@ package deployment
2424

2525
import (
2626
"k8s.io/api/core/v1"
27-
"k8s.io/apimachinery/pkg/fields"
2827
"k8s.io/client-go/tools/cache"
28+
29+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
2930
)
3031

3132
// listenForPodEvents keep listening for changes in pod until the given channel is closed.
3233
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
33-
source := cache.NewListWatchFromClient(
34-
d.deps.KubeCli.CoreV1().RESTClient(),
35-
"pods",
36-
d.apiObject.GetNamespace(),
37-
fields.Everything())
38-
3934
getPod := func(obj interface{}) (*v1.Pod, bool) {
4035
pod, ok := obj.(*v1.Pod)
4136
if !ok {
@@ -49,35 +44,35 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
4944
return pod, true
5045
}
5146

52-
_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
53-
AddFunc: func(obj interface{}) {
54-
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
55-
d.triggerInspection()
56-
}
57-
},
58-
UpdateFunc: func(oldObj, newObj interface{}) {
59-
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
60-
d.triggerInspection()
61-
}
62-
},
63-
DeleteFunc: func(obj interface{}) {
64-
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
65-
d.triggerInspection()
66-
}
67-
},
68-
}, cache.Indexers{})
69-
70-
informer.Run(stopCh)
47+
rw := k8sutil.NewResourceWatcher(
48+
d.deps.Log,
49+
d.deps.KubeCli.CoreV1().RESTClient(),
50+
"pods",
51+
d.apiObject.GetNamespace(),
52+
&v1.Pod{},
53+
cache.ResourceEventHandlerFuncs{
54+
AddFunc: func(obj interface{}) {
55+
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
56+
d.triggerInspection()
57+
}
58+
},
59+
UpdateFunc: func(oldObj, newObj interface{}) {
60+
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
61+
d.triggerInspection()
62+
}
63+
},
64+
DeleteFunc: func(obj interface{}) {
65+
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
66+
d.triggerInspection()
67+
}
68+
},
69+
})
70+
71+
rw.Run(stopCh)
7172
}
7273

7374
// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
7475
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
75-
source := cache.NewListWatchFromClient(
76-
d.deps.KubeCli.CoreV1().RESTClient(),
77-
"persistentvolumeclaims",
78-
d.apiObject.GetNamespace(),
79-
fields.Everything())
80-
8176
getPVC := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
8277
pvc, ok := obj.(*v1.PersistentVolumeClaim)
8378
if !ok {
@@ -91,35 +86,35 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
9186
return pvc, true
9287
}
9388

94-
_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
95-
AddFunc: func(obj interface{}) {
96-
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
97-
d.triggerInspection()
98-
}
99-
},
100-
UpdateFunc: func(oldObj, newObj interface{}) {
101-
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
102-
d.triggerInspection()
103-
}
104-
},
105-
DeleteFunc: func(obj interface{}) {
106-
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
107-
d.triggerInspection()
108-
}
109-
},
110-
}, cache.Indexers{})
111-
112-
informer.Run(stopCh)
89+
rw := k8sutil.NewResourceWatcher(
90+
d.deps.Log,
91+
d.deps.KubeCli.CoreV1().RESTClient(),
92+
"persistentvolumeclaims",
93+
d.apiObject.GetNamespace(),
94+
&v1.PersistentVolumeClaim{},
95+
cache.ResourceEventHandlerFuncs{
96+
AddFunc: func(obj interface{}) {
97+
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
98+
d.triggerInspection()
99+
}
100+
},
101+
UpdateFunc: func(oldObj, newObj interface{}) {
102+
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
103+
d.triggerInspection()
104+
}
105+
},
106+
DeleteFunc: func(obj interface{}) {
107+
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
108+
d.triggerInspection()
109+
}
110+
},
111+
})
112+
113+
rw.Run(stopCh)
113114
}
114115

115116
// listenForSecretEvents keep listening for changes in Secrets's until the given channel is closed.
116117
func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
117-
source := cache.NewListWatchFromClient(
118-
d.deps.KubeCli.CoreV1().RESTClient(),
119-
"secrets",
120-
d.apiObject.GetNamespace(),
121-
fields.Everything())
122-
123118
getSecret := func(obj interface{}) (*v1.Secret, bool) {
124119
secret, ok := obj.(*v1.Secret)
125120
if !ok {
@@ -133,36 +128,36 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
133128
return secret, true
134129
}
135130

136-
_, informer := cache.NewIndexerInformer(source, &v1.Secret{}, 0, cache.ResourceEventHandlerFuncs{
137-
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
138-
AddFunc: func(obj interface{}) {
139-
if _, ok := getSecret(obj); ok {
140-
d.triggerInspection()
141-
}
142-
},
143-
UpdateFunc: func(oldObj, newObj interface{}) {
144-
if _, ok := getSecret(newObj); ok {
145-
d.triggerInspection()
146-
}
147-
},
148-
DeleteFunc: func(obj interface{}) {
149-
if _, ok := getSecret(obj); ok {
150-
d.triggerInspection()
151-
}
152-
},
153-
}, cache.Indexers{})
154-
155-
informer.Run(stopCh)
131+
rw := k8sutil.NewResourceWatcher(
132+
d.deps.Log,
133+
d.deps.KubeCli.CoreV1().RESTClient(),
134+
"secrets",
135+
d.apiObject.GetNamespace(),
136+
&v1.Secret{},
137+
cache.ResourceEventHandlerFuncs{
138+
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
139+
AddFunc: func(obj interface{}) {
140+
if _, ok := getSecret(obj); ok {
141+
d.triggerInspection()
142+
}
143+
},
144+
UpdateFunc: func(oldObj, newObj interface{}) {
145+
if _, ok := getSecret(newObj); ok {
146+
d.triggerInspection()
147+
}
148+
},
149+
DeleteFunc: func(obj interface{}) {
150+
if _, ok := getSecret(obj); ok {
151+
d.triggerInspection()
152+
}
153+
},
154+
})
155+
156+
rw.Run(stopCh)
156157
}
157158

158159
// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
159160
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
160-
source := cache.NewListWatchFromClient(
161-
d.deps.KubeCli.CoreV1().RESTClient(),
162-
"services",
163-
d.apiObject.GetNamespace(),
164-
fields.Everything())
165-
166161
getService := func(obj interface{}) (*v1.Service, bool) {
167162
service, ok := obj.(*v1.Service)
168163
if !ok {
@@ -176,23 +171,29 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
176171
return service, true
177172
}
178173

179-
_, informer := cache.NewIndexerInformer(source, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
180-
AddFunc: func(obj interface{}) {
181-
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
182-
d.triggerInspection()
183-
}
184-
},
185-
UpdateFunc: func(oldObj, newObj interface{}) {
186-
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
187-
d.triggerInspection()
188-
}
189-
},
190-
DeleteFunc: func(obj interface{}) {
191-
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
192-
d.triggerInspection()
193-
}
194-
},
195-
}, cache.Indexers{})
196-
197-
informer.Run(stopCh)
174+
rw := k8sutil.NewResourceWatcher(
175+
d.deps.Log,
176+
d.deps.KubeCli.CoreV1().RESTClient(),
177+
"services",
178+
d.apiObject.GetNamespace(),
179+
&v1.Service{},
180+
cache.ResourceEventHandlerFuncs{
181+
AddFunc: func(obj interface{}) {
182+
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
183+
d.triggerInspection()
184+
}
185+
},
186+
UpdateFunc: func(oldObj, newObj interface{}) {
187+
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
188+
d.triggerInspection()
189+
}
190+
},
191+
DeleteFunc: func(obj interface{}) {
192+
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
193+
d.triggerInspection()
194+
}
195+
},
196+
})
197+
198+
rw.Run(stopCh)
198199
}

pkg/operator/operator_deployment.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
"fmt"
2727

2828
"github.com/pkg/errors"
29-
"k8s.io/apimachinery/pkg/fields"
3029
kwatch "k8s.io/apimachinery/pkg/watch"
3130
"k8s.io/client-go/tools/cache"
3231

3332
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
3433
"github.com/arangodb/kube-arangodb/pkg/deployment"
3534
"github.com/arangodb/kube-arangodb/pkg/metrics"
35+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
3636
)
3737

3838
var (
@@ -46,20 +46,20 @@ var (
4646
// run the deployments part of the operator.
4747
// This registers a listener and waits until the process stops.
4848
func (o *Operator) runDeployments(stop <-chan struct{}) {
49-
source := cache.NewListWatchFromClient(
49+
rw := k8sutil.NewResourceWatcher(
50+
o.log,
5051
o.Dependencies.CRCli.DatabaseV1alpha().RESTClient(),
5152
api.ArangoDeploymentResourcePlural,
5253
o.Config.Namespace,
53-
fields.Everything())
54-
55-
_, informer := cache.NewIndexerInformer(source, &api.ArangoDeployment{}, 0, cache.ResourceEventHandlerFuncs{
56-
AddFunc: o.onAddArangoDeployment,
57-
UpdateFunc: o.onUpdateArangoDeployment,
58-
DeleteFunc: o.onDeleteArangoDeployment,
59-
}, cache.Indexers{})
54+
&api.ArangoDeployment{},
55+
cache.ResourceEventHandlerFuncs{
56+
AddFunc: o.onAddArangoDeployment,
57+
UpdateFunc: o.onUpdateArangoDeployment,
58+
DeleteFunc: o.onDeleteArangoDeployment,
59+
})
6060

6161
o.Dependencies.DeploymentProbe.SetReady()
62-
informer.Run(stop)
62+
rw.Run(stop)
6363
}
6464

6565
// onAddArangoDeployment deployment addition callback

pkg/operator/operator_local_storage.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
"fmt"
2727

2828
"github.com/pkg/errors"
29-
"k8s.io/apimachinery/pkg/fields"
3029
kwatch "k8s.io/apimachinery/pkg/watch"
3130
"k8s.io/client-go/tools/cache"
3231

3332
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
3433
"github.com/arangodb/kube-arangodb/pkg/metrics"
3534
"github.com/arangodb/kube-arangodb/pkg/storage"
35+
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
3636
)
3737

3838
var (
@@ -46,20 +46,20 @@ var (
4646
// run the local storages part of the operator.
4747
// This registers a listener and waits until the process stops.
4848
func (o *Operator) runLocalStorages(stop <-chan struct{}) {
49-
source := cache.NewListWatchFromClient(
49+
rw := k8sutil.NewResourceWatcher(
50+
o.log,
5051
o.Dependencies.CRCli.StorageV1alpha().RESTClient(),
5152
api.ArangoLocalStorageResourcePlural,
5253
"", //o.Config.Namespace,
53-
fields.Everything())
54-
55-
_, informer := cache.NewIndexerInformer(source, &api.ArangoLocalStorage{}, 0, cache.ResourceEventHandlerFuncs{
56-
AddFunc: o.onAddArangoLocalStorage,
57-
UpdateFunc: o.onUpdateArangoLocalStorage,
58-
DeleteFunc: o.onDeleteArangoLocalStorage,
59-
}, cache.Indexers{})
54+
&api.ArangoLocalStorage{},
55+
cache.ResourceEventHandlerFuncs{
56+
AddFunc: o.onAddArangoLocalStorage,
57+
UpdateFunc: o.onUpdateArangoLocalStorage,
58+
DeleteFunc: o.onDeleteArangoLocalStorage,
59+
})
6060

6161
o.Dependencies.StorageProbe.SetReady()
62-
informer.Run(stop)
62+
rw.Run(stop)
6363
}
6464

6565
// onAddArangoLocalStorage local storage addition callback

0 commit comments

Comments
 (0)