Skip to content

Commit e5a674b

Browse files
authored
Merge pull request #15 from nginxinc/updated-tests
Improve test coverage
2 parents fdf272b + f3373ad commit e5a674b

19 files changed

+1358
-291
lines changed

cmd/nginx-k8s-edge-controller/main.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/sirupsen/logrus"
1515
"k8s.io/client-go/kubernetes"
1616
"k8s.io/client-go/rest"
17+
"k8s.io/client-go/util/workqueue"
1718
)
1819

1920
func main() {
@@ -42,20 +43,24 @@ func run() error {
4243
return fmt.Errorf(`error occurred initializing settings: %w`, err)
4344
}
4445

45-
synchronizer, err := synchronization.NewSynchronizer(settings)
46+
synchronizerWorkqueue, err := buildWorkQueue(settings.Synchronizer.WorkQueueSettings)
4647
if err != nil {
47-
return fmt.Errorf(`error initializing synchronizer: %w`, err)
48+
return fmt.Errorf(`error occurred building a workqueue: %w`, err)
4849
}
4950

50-
err = synchronizer.Initialize()
51+
synchronizer, err := synchronization.NewSynchronizer(settings, synchronizerWorkqueue)
5152
if err != nil {
5253
return fmt.Errorf(`error initializing synchronizer: %w`, err)
5354
}
5455

55-
handler := observation.NewHandler(synchronizer)
56-
handler.Initialize()
56+
handlerWorkqueue, err := buildWorkQueue(settings.Synchronizer.WorkQueueSettings)
57+
if err != nil {
58+
return fmt.Errorf(`error occurred building a workqueue: %w`, err)
59+
}
60+
61+
handler := observation.NewHandler(settings, synchronizer, handlerWorkqueue)
5762

58-
watcher, err := observation.NewWatcher(ctx, handler, k8sClient)
63+
watcher, err := observation.NewWatcher(settings, handler)
5964
if err != nil {
6065
return fmt.Errorf(`error occurred creating a watcher: %w`, err)
6166
}
@@ -97,3 +102,10 @@ func buildKubernetesClient() (*kubernetes.Clientset, error) {
97102

98103
return client, nil
99104
}
105+
106+
func buildWorkQueue(settings configuration.WorkQueueSettings) (workqueue.RateLimitingInterface, error) {
107+
logrus.Debug("Watcher::buildSynchronizerWorkQueue")
108+
109+
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(settings.RateLimiterBase, settings.RateLimiterMax)
110+
return workqueue.NewNamedRateLimitingQueue(rateLimiter, settings.Name), nil
111+
}

internal/configuration/settings.go

+64-10
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,82 @@ import (
1414
"k8s.io/client-go/kubernetes"
1515
"k8s.io/client-go/tools/cache"
1616
"strings"
17+
"time"
1718
)
1819

1920
const (
2021
ConfigMapsNamespace = "nkl"
2122
ResyncPeriod = 0
2223
)
2324

25+
type WorkQueueSettings struct {
26+
Name string
27+
RateLimiterBase time.Duration
28+
RateLimiterMax time.Duration
29+
}
30+
31+
type HandlerSettings struct {
32+
RetryCount int
33+
Threads int
34+
WorkQueueSettings WorkQueueSettings
35+
}
36+
37+
type WatcherSettings struct {
38+
NginxIngressNamespace string
39+
ResyncPeriod time.Duration
40+
}
41+
42+
type SynchronizerSettings struct {
43+
MaxMillisecondsJitter int
44+
MinMillisecondsJitter int
45+
RetryCount int
46+
Threads int
47+
WorkQueueSettings WorkQueueSettings
48+
}
49+
2450
type Settings struct {
25-
ctx context.Context
51+
Context context.Context
2652
NginxPlusHosts []string
27-
k8sClient *kubernetes.Clientset
53+
K8sClient *kubernetes.Clientset
2854
informer cache.SharedInformer
2955
eventHandlerRegistration cache.ResourceEventHandlerRegistration
56+
57+
Handler HandlerSettings
58+
Synchronizer SynchronizerSettings
59+
Watcher WatcherSettings
3060
}
3161

3262
func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) {
33-
config := new(Settings)
34-
35-
config.k8sClient = k8sClient
36-
config.ctx = ctx
63+
settings := &Settings{
64+
Context: ctx,
65+
K8sClient: k8sClient,
66+
Handler: HandlerSettings{
67+
RetryCount: 5,
68+
Threads: 1,
69+
WorkQueueSettings: WorkQueueSettings{
70+
RateLimiterBase: time.Second * 2,
71+
RateLimiterMax: time.Second * 60,
72+
Name: "nkl-handler",
73+
},
74+
},
75+
Synchronizer: SynchronizerSettings{
76+
MaxMillisecondsJitter: 750,
77+
MinMillisecondsJitter: 250,
78+
RetryCount: 5,
79+
Threads: 1,
80+
WorkQueueSettings: WorkQueueSettings{
81+
RateLimiterBase: time.Second * 2,
82+
RateLimiterMax: time.Second * 60,
83+
Name: "nkl-synchronizer",
84+
},
85+
},
86+
Watcher: WatcherSettings{
87+
NginxIngressNamespace: "nginx-ingress",
88+
ResyncPeriod: 0,
89+
},
90+
}
3791

38-
return config, nil
92+
return settings, nil
3993
}
4094

4195
func (s *Settings) Initialize() error {
@@ -63,14 +117,14 @@ func (s *Settings) Run() {
63117

64118
defer utilruntime.HandleCrash()
65119

66-
go s.informer.Run(s.ctx.Done())
120+
go s.informer.Run(s.Context.Done())
67121

68-
<-s.ctx.Done()
122+
<-s.Context.Done()
69123
}
70124

71125
func (s *Settings) buildInformer() (cache.SharedInformer, error) {
72126
options := informers.WithNamespace(ConfigMapsNamespace)
73-
factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options)
127+
factory := informers.NewSharedInformerFactoryWithOptions(s.K8sClient, ResyncPeriod, options)
74128
informer := factory.Core().V1().ConfigMaps().Informer()
75129

76130
return informer, nil

internal/core/events_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package core
2+
3+
import (
4+
nginxClient "github.com/nginxinc/nginx-plus-go-client/client"
5+
"testing"
6+
)
7+
8+
func TestServerUpdateEventWithIdAndHost(t *testing.T) {
9+
event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{})
10+
11+
if event.Id != "" {
12+
t.Errorf("expected empty Id, got %s", event.Id)
13+
}
14+
15+
if event.NginxHost != "" {
16+
t.Errorf("expected empty NginxHost, got %s", event.NginxHost)
17+
}
18+
19+
eventWithIdAndHost := ServerUpdateEventWithIdAndHost(event, "id", "host")
20+
21+
if eventWithIdAndHost.Id != "id" {
22+
t.Errorf("expected Id to be 'id', got %s", eventWithIdAndHost.Id)
23+
}
24+
25+
if eventWithIdAndHost.NginxHost != "host" {
26+
t.Errorf("expected NginxHost to be 'host', got %s", eventWithIdAndHost.NginxHost)
27+
}
28+
}
29+
30+
func TestTypeNameCreated(t *testing.T) {
31+
event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{})
32+
33+
if event.TypeName() != "Created" {
34+
t.Errorf("expected 'Created', got %s", event.TypeName())
35+
}
36+
}
37+
38+
func TestTypeNameUpdated(t *testing.T) {
39+
event := NewServerUpdateEvent(Updated, "upstream", []nginxClient.StreamUpstreamServer{})
40+
41+
if event.TypeName() != "Updated" {
42+
t.Errorf("expected 'Updated', got %s", event.TypeName())
43+
}
44+
}
45+
46+
func TestTypeNameDeleted(t *testing.T) {
47+
event := NewServerUpdateEvent(Deleted, "upstream", []nginxClient.StreamUpstreamServer{})
48+
49+
if event.TypeName() != "Deleted" {
50+
t.Errorf("expected 'Deleted', got %s", event.TypeName())
51+
}
52+
}
53+
54+
func TestTypeNameUnknown(t *testing.T) {
55+
event := NewServerUpdateEvent(EventType(100), "upstream", []nginxClient.StreamUpstreamServer{})
56+
57+
if event.TypeName() != "Unknown" {
58+
t.Errorf("expected 'Unknown', got %s", event.TypeName())
59+
}
60+
}

internal/observation/handler.go

+13-15
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,31 @@ package observation
66

77
import (
88
"fmt"
9+
"github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration"
910
"github.com/nginxinc/kubernetes-nginx-ingress/internal/core"
1011
"github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization"
1112
"github.com/nginxinc/kubernetes-nginx-ingress/internal/translation"
1213
"github.com/sirupsen/logrus"
1314
"k8s.io/apimachinery/pkg/util/wait"
1415
"k8s.io/client-go/util/workqueue"
15-
"time"
1616
)
1717

18-
const RateLimiterBase = time.Second * 2
19-
const RateLimiterMax = time.Second * 60
20-
const RetryCount = 5
21-
const Threads = 1
22-
const WatcherQueueName = `nkl-handler`
18+
type HandlerInterface interface {
19+
AddRateLimitedEvent(event *core.Event)
20+
Run(stopCh <-chan struct{})
21+
ShutDown()
22+
}
2323

2424
type Handler struct {
2525
eventQueue workqueue.RateLimitingInterface
26-
synchronizer *synchronization.Synchronizer
26+
settings *configuration.Settings
27+
synchronizer synchronization.Interface
2728
}
2829

29-
func NewHandler(synchronizer *synchronization.Synchronizer) *Handler {
30+
func NewHandler(settings *configuration.Settings, synchronizer synchronization.Interface, eventQueue workqueue.RateLimitingInterface) *Handler {
3031
return &Handler{
32+
eventQueue: eventQueue,
33+
settings: settings,
3134
synchronizer: synchronizer,
3235
}
3336
}
@@ -37,15 +40,10 @@ func (h *Handler) AddRateLimitedEvent(event *core.Event) {
3740
h.eventQueue.AddRateLimited(event)
3841
}
3942

40-
func (h *Handler) Initialize() {
41-
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(RateLimiterBase, RateLimiterMax)
42-
h.eventQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, WatcherQueueName)
43-
}
44-
4543
func (h *Handler) Run(stopCh <-chan struct{}) {
4644
logrus.Debug("Handler::Run")
4745

48-
for i := 0; i < Threads; i++ {
46+
for i := 0; i < h.settings.Handler.Threads; i++ {
4947
go wait.Until(h.worker, 0, stopCh)
5048
}
5149

@@ -97,7 +95,7 @@ func (h *Handler) withRetry(err error, event *core.Event) {
9795
logrus.Debug("Handler::withRetry")
9896
if err != nil {
9997
// TODO: Add Telemetry
100-
if h.eventQueue.NumRequeues(event) < RetryCount { // TODO: Make this configurable
98+
if h.eventQueue.NumRequeues(event) < h.settings.Handler.RetryCount {
10199
h.eventQueue.AddRateLimited(event)
102100
logrus.Infof(`Handler::withRetry: requeued event: %#v; error: %v`, event, err)
103101
} else {

internal/observation/handler_test.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package observation
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration"
7+
"github.com/nginxinc/kubernetes-nginx-ingress/internal/core"
8+
"github.com/nginxinc/kubernetes-nginx-ingress/test/mocks"
9+
v1 "k8s.io/api/core/v1"
10+
"k8s.io/client-go/util/workqueue"
11+
"testing"
12+
)
13+
14+
func TestHandler_AddsEventToSynchronizer(t *testing.T) {
15+
_, _, synchronizer, handler, err := buildHandler()
16+
if err != nil {
17+
t.Errorf(`should have been no error, %v`, err)
18+
}
19+
20+
event := &core.Event{
21+
Type: core.Created,
22+
Service: &v1.Service{
23+
Spec: v1.ServiceSpec{
24+
Ports: []v1.ServicePort{
25+
{
26+
Name: "nkl-back",
27+
},
28+
},
29+
},
30+
},
31+
}
32+
33+
handler.AddRateLimitedEvent(event)
34+
35+
handler.handleNextEvent()
36+
37+
if len(synchronizer.Events) != 1 {
38+
t.Errorf(`handler.AddRateLimitedEvent did not add the event to the queue`)
39+
}
40+
}
41+
42+
func buildHandler() (*configuration.Settings, workqueue.RateLimitingInterface, *mocks.MockSynchronizer, *Handler, error) {
43+
settings, err := configuration.NewSettings(context.Background(), nil)
44+
if err != nil {
45+
return nil, nil, nil, nil, fmt.Errorf(`should have been no error, %v`, err)
46+
}
47+
48+
eventQueue := &mocks.MockRateLimiter{}
49+
synchronizer := &mocks.MockSynchronizer{}
50+
51+
handler := NewHandler(settings, synchronizer, eventQueue)
52+
53+
return settings, eventQueue, synchronizer, handler, nil
54+
}

0 commit comments

Comments
 (0)