@@ -32,6 +32,7 @@ import (
32
32
"sort"
33
33
"strconv"
34
34
"strings"
35
+ "time"
35
36
36
37
"k8s.io/apimachinery/pkg/api/resource"
37
38
@@ -41,6 +42,8 @@ import (
41
42
42
43
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
43
44
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
45
+ "github.com/arangodb/kube-arangodb/pkg/util/constants"
46
+ "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
44
47
)
45
48
46
49
const (
@@ -72,7 +75,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
72
75
clients [i ], clients [j ] = clients [j ], clients [i ]
73
76
})
74
77
78
+ var nodeClientMap map [string ]provisioner.API
75
79
for i , claim := range unboundClaims {
80
+ // Find deployment name & role in the claim (if any)
81
+ deplName , role , enforceAniAffinity := getDeploymentInfo (claim )
82
+ allowedClients := clients
83
+ if enforceAniAffinity && deplName != "" {
84
+ // Select nodes to choose from such that no volume in group lands on the same node
85
+ if nodeClientMap == nil {
86
+ nodeClientMap = createNodeClientMap (ctx , clients )
87
+ }
88
+ var err error
89
+ allowedClients , err = ls .filterAllowedNodes (nodeClientMap , deplName , role )
90
+ if err != nil {
91
+ log .Warn ().Err (err ).Msg ("Failed to filter allowed nodes" )
92
+ continue // We'll try this claim again later
93
+ }
94
+ }
95
+
76
96
// Find size of PVC
77
97
volSize := defaultVolumeSize
78
98
if reqStorage := claim .Spec .Resources .Requests .StorageEphemeral (); reqStorage != nil {
@@ -81,7 +101,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
81
101
}
82
102
}
83
103
// Create PV
84
- if err := ls .createPV (ctx , apiObject , clients , i , volSize ); err != nil {
104
+ if err := ls .createPV (ctx , apiObject , allowedClients , i , volSize , claim , deplName , role ); err != nil {
85
105
log .Error ().Err (err ).Msg ("Failed to create PersistentVolume" )
86
106
}
87
107
}
@@ -90,7 +110,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
90
110
}
91
111
92
112
// createPV creates a PersistentVolume.
93
- func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 ) error {
113
+ func (ls * LocalStorage ) createPV (ctx context.Context , apiObject * api.ArangoLocalStorage , clients []provisioner.API , clientsOffset int , volSize int64 , claim v1. PersistentVolumeClaim , deploymentName , role string ) error {
94
114
log := ls .deps .Log
95
115
// Try clients
96
116
for clientIdx := 0 ; clientIdx < len (clients ); clientIdx ++ {
@@ -131,6 +151,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
131
151
v1 .AlphaStorageNodeAffinityAnnotation : nodeAff ,
132
152
nodeNameAnnotation : info .NodeName ,
133
153
},
154
+ Labels : map [string ]string {
155
+ k8sutil .LabelKeyArangoDeployment : deploymentName ,
156
+ k8sutil .LabelKeyRole : role ,
157
+ },
134
158
},
135
159
Spec : v1.PersistentVolumeSpec {
136
160
Capacity : v1.ResourceList {
@@ -147,6 +171,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
147
171
},
148
172
StorageClassName : apiObject .Spec .StorageClass .Name ,
149
173
VolumeMode : & volumeMode ,
174
+ ClaimRef : & v1.ObjectReference {
175
+ Kind : "PersistentVolumeClaim" ,
176
+ APIVersion : "" ,
177
+ Name : claim .GetName (),
178
+ Namespace : claim .GetNamespace (),
179
+ UID : claim .GetUID (),
180
+ },
150
181
},
151
182
}
152
183
// Attach PV to ArangoLocalStorage
@@ -159,6 +190,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
159
190
Str ("name" , pvName ).
160
191
Str ("node-name" , info .NodeName ).
161
192
Msg ("Created PersistentVolume" )
193
+
194
+ // Bind claim to volume
195
+ if err := ls .bindClaimToVolume (claim , pv .GetName ()); err != nil {
196
+ // Try to delete the PV now
197
+ if err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().Delete (pv .GetName (), & metav1.DeleteOptions {}); err != nil {
198
+ log .Error ().Err (err ).Msg ("Failed to delete PV after binding PVC failed" )
199
+ }
200
+ return maskAny (err )
201
+ }
202
+
162
203
return nil
163
204
}
164
205
}
@@ -204,3 +245,90 @@ func createNodeAffinity(nodeName string) (string, error) {
204
245
}
205
246
return string (encoded ), nil
206
247
}
248
+
249
+ // createNodeClientMap creates a map from node name to API.
250
+ // Clients that do not respond properly on a GetNodeInfo request are
251
+ // ignored.
252
+ func createNodeClientMap (ctx context.Context , clients []provisioner.API ) map [string ]provisioner.API {
253
+ result := make (map [string ]provisioner.API )
254
+ for _ , c := range clients {
255
+ if info , err := c .GetNodeInfo (ctx ); err == nil {
256
+ result [info .NodeName ] = c
257
+ }
258
+ }
259
+ return result
260
+ }
261
+
262
+ // getDeploymentInfo returns the name of the deployment that created the given claim,
263
+ // the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
264
+ // If not found, empty strings are returned.
265
+ // Returns deploymentName, role, enforceAntiAffinity.
266
+ func getDeploymentInfo (pvc v1.PersistentVolumeClaim ) (string , string , bool ) {
267
+ deploymentName := pvc .GetLabels ()[k8sutil .LabelKeyArangoDeployment ]
268
+ role := pvc .GetLabels ()[k8sutil .LabelKeyRole ]
269
+ enforceAntiAffinity , _ := strconv .ParseBool (pvc .GetAnnotations ()[constants .AnnotationEnforceAntiAffinity ]) // If annotation empty, this will yield false.
270
+ return deploymentName , role , enforceAntiAffinity
271
+ }
272
+
273
+ // filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
274
+ func (ls * LocalStorage ) filterAllowedNodes (clients map [string ]provisioner.API , deploymentName , role string ) ([]provisioner.API , error ) {
275
+ // Find all PVs for given deployment & role
276
+ list , err := ls .deps .KubeCli .CoreV1 ().PersistentVolumes ().List (metav1.ListOptions {
277
+ LabelSelector : fmt .Sprintf ("%s=%s,%s=%s" , k8sutil .LabelKeyArangoDeployment , deploymentName , k8sutil .LabelKeyRole , role ),
278
+ })
279
+ if err != nil {
280
+ return nil , maskAny (err )
281
+ }
282
+ excludedNodes := make (map [string ]struct {})
283
+ for _ , pv := range list .Items {
284
+ nodeName := pv .GetAnnotations ()[nodeNameAnnotation ]
285
+ excludedNodes [nodeName ] = struct {}{}
286
+ }
287
+ result := make ([]provisioner.API , 0 , len (clients ))
288
+ for nodeName , c := range clients {
289
+ if _ , found := excludedNodes [nodeName ]; ! found {
290
+ result = append (result , c )
291
+ }
292
+ }
293
+ return result , nil
294
+ }
295
+
296
+ // bindClaimToVolume tries to bind the given claim to the volume with given name.
297
+ // If the claim has been updated, the function retries several times.
298
+ func (ls * LocalStorage ) bindClaimToVolume (claim v1.PersistentVolumeClaim , volumeName string ) error {
299
+ log := ls .deps .Log .With ().Str ("pvc-name" , claim .GetName ()).Str ("volume-name" , volumeName ).Logger ()
300
+ pvcs := ls .deps .KubeCli .CoreV1 ().PersistentVolumeClaims (claim .GetNamespace ())
301
+
302
+ for attempt := 0 ; attempt < 10 ; attempt ++ {
303
+ // Backoff if needed
304
+ time .Sleep (time .Millisecond * time .Duration (10 * attempt ))
305
+
306
+ // Fetch latest version of claim
307
+ updated , err := pvcs .Get (claim .GetName (), metav1.GetOptions {})
308
+ if k8sutil .IsNotFound (err ) {
309
+ return maskAny (err )
310
+ } else if err != nil {
311
+ log .Warn ().Err (err ).Msg ("Failed to load updated PersistentVolumeClaim" )
312
+ continue
313
+ }
314
+
315
+ // Check claim. If already bound, bail out
316
+ if ! pvcNeedsVolume (* updated ) {
317
+ return maskAny (fmt .Errorf ("PersistentVolumeClaim '%s' no longer needs a volume" , claim .GetName ()))
318
+ }
319
+
320
+ // Try to bind
321
+ updated .Spec .VolumeName = volumeName
322
+ if _ , err := pvcs .Update (updated ); k8sutil .IsConflict (err ) {
323
+ // Claim modified already, retry
324
+ log .Debug ().Err (err ).Msg ("PersistentVolumeClaim has been modified. Retrying." )
325
+ } else if err != nil {
326
+ log .Error ().Err (err ).Msg ("Failed to bind PVC to volume" )
327
+ return maskAny (err )
328
+ }
329
+ log .Debug ().Msg ("Bound volume to PersistentVolumeClaim" )
330
+ return nil
331
+ }
332
+ log .Error ().Msg ("All attempts to bind PVC to volume failed" )
333
+ return maskAny (fmt .Errorf ("All attempts to bind PVC to volume failed" ))
334
+ }
0 commit comments