Skip to content

Commit 1d4d025

Browse files
committed
code cleanup; channel instead of ctx; log additions
1 parent 304b34c commit 1d4d025

File tree

12 files changed

+148
-147
lines changed

12 files changed

+148
-147
lines changed

internal/mode/static/handler.go

+28-15
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,40 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
182182

183183
// TODO(sberman): if nginx Deployment is scaled down, we should remove the pod from the ConnectionsTracker
184184
// and Deployment.
185-
// If fully deleted, then delete the deployment from the Store
186-
var configApplied bool
187-
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(ctx))
185+
// If fully deleted, then delete the deployment from the Store and close the stopCh.
186+
stopCh := make(chan struct{})
187+
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(stopCh))
188188
if deployment == nil {
189189
panic("expected deployment, got nil")
190190
}
191191

192+
configApplied := h.processStateAndBuildConfig(ctx, logger, gr, changeType, deployment)
193+
194+
configErr := deployment.GetLatestConfigError()
195+
upstreamErr := deployment.GetLatestUpstreamError()
196+
err := errors.Join(configErr, upstreamErr)
197+
198+
if configApplied || err != nil {
199+
obj := &status.QueueObject{
200+
Error: err,
201+
Deployment: deploymentName,
202+
}
203+
h.cfg.statusQueue.Enqueue(obj)
204+
}
205+
}
206+
207+
func (h *eventHandlerImpl) processStateAndBuildConfig(
208+
ctx context.Context,
209+
logger logr.Logger,
210+
gr *graph.Graph,
211+
changeType state.ChangeType,
212+
deployment *agent.Deployment,
213+
) bool {
214+
var configApplied bool
192215
switch changeType {
193216
case state.NoChange:
194217
logger.Info("Handling events didn't result into NGINX configuration changes")
195-
return
218+
return false
196219
case state.EndpointsOnlyChange:
197220
h.version++
198221
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version)
@@ -227,17 +250,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
227250
deployment.Lock.Unlock()
228251
}
229252

230-
configErr := deployment.GetLatestConfigError()
231-
upstreamErr := deployment.GetLatestUpstreamError()
232-
err := errors.Join(configErr, upstreamErr)
233-
234-
if configApplied || err != nil {
235-
obj := &status.QueueObject{
236-
Error: err,
237-
Deployment: deploymentName,
238-
}
239-
h.cfg.statusQueue.Enqueue(obj)
240-
}
253+
return configApplied
241254
}
242255

243256
func (h *eventHandlerImpl) waitForStatusUpdates(ctx context.Context) {

internal/mode/static/nginx/agent/agent.go

+4-20
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,10 @@ func (n *NginxUpdaterImpl) UpdateConfig(
8787
) bool {
8888
n.logger.Info("Sending nginx configuration to agent")
8989

90-
// reset the latest error to nil now that we're applying new config
91-
deployment.SetLatestConfigError(nil)
92-
9390
msg := deployment.SetFiles(files)
9491
applied := deployment.GetBroadcaster().Send(msg)
9592

96-
latestStatus := deployment.GetConfigurationStatus()
97-
if latestStatus != nil {
98-
deployment.SetLatestConfigError(latestStatus)
99-
}
93+
deployment.SetLatestConfigError(deployment.GetConfigurationStatus())
10094

10195
return applied
10296
}
@@ -131,18 +125,6 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
131125
},
132126
}
133127
actions = append(actions, action)
134-
135-
msg := broadcast.NginxAgentMessage{
136-
Type: broadcast.APIRequest,
137-
NGINXPlusAction: action,
138-
}
139-
140-
requestApplied, err := n.sendRequest(broadcaster, msg, deployment)
141-
if err != nil {
142-
errs = append(errs, fmt.Errorf(
143-
"couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus()))
144-
}
145-
applied = applied || requestApplied
146128
}
147129

148130
for _, upstream := range conf.StreamUpstreams {
@@ -152,7 +134,9 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
152134
},
153135
}
154136
actions = append(actions, action)
137+
}
155138

139+
for _, action := range actions {
156140
msg := broadcast.NginxAgentMessage{
157141
Type: broadcast.APIRequest,
158142
NGINXPlusAction: action,
@@ -161,7 +145,7 @@ func (n *NginxUpdaterImpl) UpdateUpstreamServers(
161145
requestApplied, err := n.sendRequest(broadcaster, msg, deployment)
162146
if err != nil {
163147
errs = append(errs, fmt.Errorf(
164-
"couldn't update upstream %q via the API: %w", upstream.Name, deployment.GetConfigurationStatus()))
148+
"couldn't update upstream via the API: %w", deployment.GetConfigurationStatus()))
165149
}
166150
applied = applied || requestApplied
167151
}

internal/mode/static/nginx/agent/agent_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ func TestUpdateUpstreamServers(t *testing.T) {
227227

228228
if test.expErr {
229229
expErr := errors.Join(
230-
fmt.Errorf("couldn't update upstream \"test-upstream\" via the API: %w", testErr),
231-
fmt.Errorf("couldn't update upstream \"test-stream-upstream\" via the API: %w", testErr),
230+
fmt.Errorf("couldn't update upstream via the API: %w", testErr),
231+
fmt.Errorf("couldn't update upstream via the API: %w", testErr),
232232
)
233233

234234
g.Expect(deployment.GetLatestUpstreamError()).To(Equal(expErr))

internal/mode/static/nginx/agent/broadcast/broadcast.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package broadcast
22

33
import (
4-
"context"
54
"sync"
65

76
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
@@ -49,15 +48,15 @@ type DeploymentBroadcaster struct {
4948
}
5049

5150
// NewDeploymentBroadcaster returns a new instance of a DeploymentBroadcaster.
52-
func NewDeploymentBroadcaster(ctx context.Context) *DeploymentBroadcaster {
51+
func NewDeploymentBroadcaster(stopCh chan struct{}) *DeploymentBroadcaster {
5352
broadcaster := &DeploymentBroadcaster{
5453
listeners: make(map[string]storedChannels),
5554
publishCh: make(chan NginxAgentMessage),
5655
subCh: make(chan storedChannels),
5756
unsubCh: make(chan string),
5857
doneCh: make(chan struct{}),
5958
}
60-
go broadcaster.run(ctx)
59+
go broadcaster.run(stopCh)
6160

6261
return broadcaster
6362
}
@@ -99,14 +98,14 @@ func (b *DeploymentBroadcaster) CancelSubscription(id string) {
9998
}
10099

101100
// run starts the broadcaster loop. It handles the following events:
102-
// - if context is canceled, return.
101+
// - if stopCh is closed, return.
103102
// - if receiving a new subscriber, add it to the subscriber list.
104103
// - if receiving a canceled subscription, remove it from the subscriber list.
105104
// - if receiving a message to publish, send it to all subscribers.
106-
func (b *DeploymentBroadcaster) run(ctx context.Context) {
105+
func (b *DeploymentBroadcaster) run(stopCh chan struct{}) {
107106
for {
108107
select {
109-
case <-ctx.Done():
108+
case <-stopCh:
110109
return
111110
case channels := <-b.subCh:
112111
b.listeners[channels.id] = channels

internal/mode/static/nginx/agent/broadcast/broadcast_test.go

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package broadcast_test
22

33
import (
4-
"context"
54
"testing"
65

76
. "github.com/onsi/gomega"
@@ -13,10 +12,10 @@ func TestSubscribe(t *testing.T) {
1312
t.Parallel()
1413
g := NewWithT(t)
1514

16-
ctx, cancel := context.WithCancel(context.Background())
17-
defer cancel()
15+
stopCh := make(chan struct{})
16+
defer close(stopCh)
1817

19-
broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
18+
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)
2019

2120
subscriber := broadcaster.Subscribe()
2221
g.Expect(subscriber.ID).NotTo(BeEmpty())
@@ -38,10 +37,10 @@ func TestSubscribe_MultipleListeners(t *testing.T) {
3837
t.Parallel()
3938
g := NewWithT(t)
4039

41-
ctx, cancel := context.WithCancel(context.Background())
42-
defer cancel()
40+
stopCh := make(chan struct{})
41+
defer close(stopCh)
4342

44-
broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
43+
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)
4544

4645
subscriber1 := broadcaster.Subscribe()
4746
subscriber2 := broadcaster.Subscribe()
@@ -67,10 +66,10 @@ func TestSubscribe_NoListeners(t *testing.T) {
6766
t.Parallel()
6867
g := NewWithT(t)
6968

70-
ctx, cancel := context.WithCancel(context.Background())
71-
defer cancel()
69+
stopCh := make(chan struct{})
70+
defer close(stopCh)
7271

73-
broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
72+
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)
7473

7574
message := broadcast.NginxAgentMessage{
7675
ConfigVersion: "v1",
@@ -85,10 +84,10 @@ func TestCancelSubscription(t *testing.T) {
8584
t.Parallel()
8685
g := NewWithT(t)
8786

88-
ctx, cancel := context.WithCancel(context.Background())
89-
defer cancel()
87+
stopCh := make(chan struct{})
88+
defer close(stopCh)
9089

91-
broadcaster := broadcast.NewDeploymentBroadcaster(ctx)
90+
broadcaster := broadcast.NewDeploymentBroadcaster(stopCh)
9291

9392
subscriber := broadcaster.Subscribe()
9493

internal/mode/static/nginx/agent/command.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func (cs *commandService) CreateConnection(
9393
Error: err.Error(),
9494
},
9595
}
96+
cs.logger.Error(err, "error getting pod owner")
9697
return response, grpcStatus.Errorf(codes.Internal, "error getting pod owner %s", err.Error())
9798
}
9899

@@ -145,12 +146,12 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
145146

146147
return err
147148
}
149+
deployment.Lock.RUnlock()
148150

149151
// subscribe to the deployment broadcaster to get file updates
150152
broadcaster := deployment.GetBroadcaster()
151153
channels := broadcaster.Subscribe()
152154
defer broadcaster.CancelSubscription(channels.ID)
153-
deployment.Lock.RUnlock()
154155

155156
for {
156157
select {
@@ -175,7 +176,10 @@ func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error
175176
return grpcStatus.Error(codes.Internal, err.Error())
176177
}
177178
case err = <-msgr.Errors():
178-
cs.logger.Error(err, "connection error")
179+
cs.logger.Error(err, "connection error", "pod", conn.PodName)
180+
deployment.SetPodErrorStatus(conn.PodName, err)
181+
channels.ResponseCh <- struct{}{}
182+
179183
if errors.Is(err, io.EOF) {
180184
return grpcStatus.Error(codes.Aborted, err.Error())
181185
}
@@ -214,7 +218,7 @@ func (cs *commandService) waitForConnection(
214218
case <-timer.C:
215219
return nil, nil, err
216220
case <-ticker.C:
217-
if conn, ok := cs.connTracker.ConnectionIsReady(gi.IPAddress); ok {
221+
if conn, ok := cs.connTracker.Ready(gi.IPAddress); ok {
218222
// connection has been established, now ensure that the deployment exists in the store
219223
if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil {
220224
return &conn, deployment, nil
@@ -332,7 +336,7 @@ func (cs *commandService) logAndSendErrorStatus(deployment *Deployment, conn *ag
332336
if err != nil {
333337
cs.logger.Error(err, "error sending request to agent")
334338
} else {
335-
cs.logger.Info(fmt.Sprintf("Successfully configured nginx for new subscription %q", conn.PodName))
339+
cs.logger.Info("Successfully configured nginx for new subscription", "pod", conn.PodName)
336340
}
337341
deployment.SetPodErrorStatus(conn.PodName, err)
338342

internal/mode/static/nginx/agent/command_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ func TestSubscribe(t *testing.T) {
295295
PodName: "nginx-pod",
296296
InstanceID: "nginx-id",
297297
}
298-
connTracker.ConnectionIsReadyReturns(conn, true)
298+
connTracker.ReadyReturns(conn, true)
299299

300300
cs := newCommandService(
301301
logr.Discard(),
@@ -438,7 +438,7 @@ func TestSubscribe_Errors(t *testing.T) {
438438
cs *commandService,
439439
ct *agentgrpcfakes.FakeConnectionsTracker,
440440
) {
441-
ct.ConnectionIsReadyReturns(agentgrpc.Connection{}, true)
441+
ct.ReadyReturns(agentgrpc.Connection{}, true)
442442
cs.connectionTimeout = 1100 * time.Millisecond
443443
},
444444
errString: "timed out waiting for nginx deployment to be added to store",

internal/mode/static/nginx/agent/deployment.go

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ func (d *Deployment) SetNGINXPlusActions(actions []*pb.NGINXPlusAction) {
164164
}
165165

166166
// SetPodErrorStatus sets the error status of a Pod in this Deployment if applying the config failed.
167+
// Caller MUST lock the deployment before calling this function.
167168
func (d *Deployment) SetPodErrorStatus(pod string, err error) {
168169
d.podStatuses[pod] = err
169170
}

internal/mode/static/nginx/agent/grpc/connections.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
type ConnectionsTracker interface {
1616
Track(key string, conn Connection)
1717
GetConnection(key string) Connection
18-
ConnectionIsReady(key string) (Connection, bool)
18+
Ready(key string) (Connection, bool)
1919
SetInstanceID(key, id string)
2020
UntrackConnectionsForParent(parent types.NamespacedName)
2121
}
@@ -63,7 +63,7 @@ func (c *AgentConnectionsTracker) GetConnection(key string) Connection {
6363

6464
// ConnectionIsReady returns if the connection is ready to be used. In other words, agent
6565
// has registered itself and an nginx instance with the control plane.
66-
func (c *AgentConnectionsTracker) ConnectionIsReady(key string) (Connection, bool) {
66+
func (c *AgentConnectionsTracker) Ready(key string) (Connection, bool) {
6767
c.lock.RLock()
6868
defer c.lock.RUnlock()
6969

internal/mode/static/nginx/agent/grpc/connections_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestGetConnection(t *testing.T) {
2929
g.Expect(nonExistent).To(Equal(agentgrpc.Connection{}))
3030
}
3131

32-
func TestConnectionIsReady(t *testing.T) {
32+
func TestReady(t *testing.T) {
3333
t.Parallel()
3434
g := NewWithT(t)
3535

@@ -42,7 +42,7 @@ func TestConnectionIsReady(t *testing.T) {
4242
}
4343
tracker.Track("key1", conn)
4444

45-
trackedConn, ready := tracker.ConnectionIsReady("key1")
45+
trackedConn, ready := tracker.Ready("key1")
4646
g.Expect(ready).To(BeTrue())
4747
g.Expect(trackedConn).To(Equal(conn))
4848
}
@@ -59,7 +59,7 @@ func TestConnectionIsNotReady(t *testing.T) {
5959
}
6060
tracker.Track("key1", conn)
6161

62-
_, ready := tracker.ConnectionIsReady("key1")
62+
_, ready := tracker.Ready("key1")
6363
g.Expect(ready).To(BeFalse())
6464
}
6565

@@ -74,12 +74,12 @@ func TestSetInstanceID(t *testing.T) {
7474
}
7575
tracker.Track("key1", conn)
7676

77-
_, ready := tracker.ConnectionIsReady("key1")
77+
_, ready := tracker.Ready("key1")
7878
g.Expect(ready).To(BeFalse())
7979

8080
tracker.SetInstanceID("key1", "instance1")
8181

82-
trackedConn, ready := tracker.ConnectionIsReady("key1")
82+
trackedConn, ready := tracker.Ready("key1")
8383
g.Expect(ready).To(BeTrue())
8484
g.Expect(trackedConn.InstanceID).To(Equal("instance1"))
8585
}

internal/mode/static/nginx/agent/grpc/grpc.go

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (g *Server) Start(ctx context.Context) error {
8282
go func() {
8383
<-ctx.Done()
8484
g.logger.Info("Shutting down GRPC Server")
85+
// Since we use a long-lived stream, GracefulStop does not terminate. Therefore we use Stop.
8586
server.Stop()
8687
}()
8788

0 commit comments

Comments
 (0)