Skip to content

Commit 096d6a9

Browse files
committed
Ready connection; broadcaster initialization
1 parent 1d4d025 commit 096d6a9

File tree

9 files changed

+39
-122
lines changed

9 files changed

+39
-122
lines changed

internal/mode/static/handler.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
ngfConfig "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config"
2323
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/licensing"
2424
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent"
25-
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast"
2625
ngxConfig "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config"
2726
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state"
2827
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane"
@@ -184,7 +183,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
184183
// and Deployment.
185184
// If fully deleted, then delete the deployment from the Store and close the stopCh.
186185
stopCh := make(chan struct{})
187-
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, broadcast.NewDeploymentBroadcaster(stopCh))
186+
deployment := h.cfg.nginxDeployments.GetOrStore(deploymentName, stopCh)
188187
if deployment == nil {
189188
panic("expected deployment, got nil")
190189
}

internal/mode/static/nginx/agent/command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (cs *commandService) waitForConnection(
218218
case <-timer.C:
219219
return nil, nil, err
220220
case <-ticker.C:
221-
if conn, ok := cs.connTracker.Ready(gi.IPAddress); ok {
221+
if conn := cs.connTracker.GetConnection(gi.IPAddress); conn.Ready() {
222222
// connection has been established, now ensure that the deployment exists in the store
223223
if deployment := cs.nginxDeployments.Get(conn.Parent); deployment != nil {
224224
return &conn, deployment, nil

internal/mode/static/nginx/agent/command_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,13 @@ func TestSubscribe(t *testing.T) {
295295
PodName: "nginx-pod",
296296
InstanceID: "nginx-id",
297297
}
298-
connTracker.ReadyReturns(conn, true)
298+
connTracker.GetConnectionReturns(conn)
299299

300+
store := NewDeploymentStore(&connTracker)
300301
cs := newCommandService(
301302
logr.Discard(),
302303
fake.NewFakeClient(),
303-
NewDeploymentStore(&connTracker),
304+
store,
304305
&connTracker,
305306
status.NewQueue(),
306307
)
@@ -315,7 +316,7 @@ func TestSubscribe(t *testing.T) {
315316
broadcaster.SubscribeReturns(subChannels)
316317

317318
// set the initial files and actions to be applied by the Subscription
318-
deployment := cs.nginxDeployments.GetOrStore(conn.Parent, broadcaster)
319+
deployment := store.StoreWithBroadcaster(conn.Parent, broadcaster)
319320
files := []File{
320321
{
321322
Meta: &pb.FileMeta{
@@ -438,7 +439,7 @@ func TestSubscribe_Errors(t *testing.T) {
438439
cs *commandService,
439440
ct *agentgrpcfakes.FakeConnectionsTracker,
440441
) {
441-
ct.ReadyReturns(agentgrpc.Connection{}, true)
442+
ct.GetConnectionReturns(agentgrpc.Connection{InstanceID: "nginx-id"})
442443
cs.connectionTimeout = 1100 * time.Millisecond
443444
},
444445
errString: "timed out waiting for nginx deployment to be added to store",
@@ -581,8 +582,7 @@ func TestSetInitialConfig_Errors(t *testing.T) {
581582
InstanceID: "nginx-id",
582583
}
583584

584-
broadcaster := &broadcastfakes.FakeBroadcaster{}
585-
deployment := cs.nginxDeployments.GetOrStore(conn.Parent, broadcaster)
585+
deployment := newDeployment(&broadcastfakes.FakeBroadcaster{})
586586

587587
if test.setup != nil {
588588
test.setup(msgr, deployment)

internal/mode/static/nginx/agent/deployment.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type Deployment struct {
5858
Lock sync.RWMutex
5959
}
6060

61-
// newDeployment returns a new deployment object.
61+
// newDeployment returns a new Deployment object.
6262
func newDeployment(broadcaster broadcast.Broadcaster) *Deployment {
6363
return &Deployment{
6464
broadcaster: broadcaster,
@@ -227,11 +227,23 @@ func (d *DeploymentStore) Get(nsName types.NamespacedName) *Deployment {
227227

228228
// GetOrStore returns the existing value for the key if present.
229229
// Otherwise, it stores and returns the given value.
230-
func (d *DeploymentStore) GetOrStore(nsName types.NamespacedName, broadcaster broadcast.Broadcaster) *Deployment {
230+
func (d *DeploymentStore) GetOrStore(nsName types.NamespacedName, stopCh chan struct{}) *Deployment {
231231
if deployment := d.Get(nsName); deployment != nil {
232232
return deployment
233233
}
234234

235+
deployment := newDeployment(broadcast.NewDeploymentBroadcaster(stopCh))
236+
d.deployments.Store(nsName, deployment)
237+
238+
return deployment
239+
}
240+
241+
// StoreWithBroadcaster creates a new Deployment with the supplied broadcaster and stores it.
242+
// Used in unit tests to provide a mock broadcaster.
243+
func (d *DeploymentStore) StoreWithBroadcaster(
244+
nsName types.NamespacedName,
245+
broadcaster broadcast.Broadcaster,
246+
) *Deployment {
235247
deployment := newDeployment(broadcaster)
236248
d.deployments.Store(nsName, deployment)
237249

internal/mode/static/nginx/agent/deployment_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ func TestDeploymentStore(t *testing.T) {
122122

123123
nsName := types.NamespacedName{Namespace: "default", Name: "test-deployment"}
124124

125-
deployment := store.GetOrStore(nsName, &broadcastfakes.FakeBroadcaster{})
125+
deployment := store.GetOrStore(nsName, nil)
126126
g.Expect(deployment).ToNot(BeNil())
127127

128128
fetchedDeployment := store.Get(nsName)
129129
g.Expect(fetchedDeployment).To(Equal(deployment))
130130

131-
deployment = store.GetOrStore(nsName, &broadcastfakes.FakeBroadcaster{})
131+
deployment = store.GetOrStore(nsName, nil)
132132
g.Expect(fetchedDeployment).To(Equal(deployment))
133133

134134
store.Remove(nsName)

internal/mode/static/nginx/agent/file_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"google.golang.org/grpc/status"
1212
"k8s.io/apimachinery/pkg/types"
1313

14-
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast/broadcastfakes"
1514
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
1615
grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
1716
agentgrpcfakes "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/grpcfakes"
@@ -32,7 +31,7 @@ func TestGetFile(t *testing.T) {
3231
connTracker.GetConnectionReturns(conn)
3332

3433
depStore := NewDeploymentStore(connTracker)
35-
dep := depStore.GetOrStore(deploymentName, &broadcastfakes.FakeBroadcaster{})
34+
dep := depStore.GetOrStore(deploymentName, nil)
3635

3736
fileMeta := &pb.FileMeta{
3837
Name: "test.conf",
@@ -155,7 +154,7 @@ func TestGetFile_FileNotFound(t *testing.T) {
155154
connTracker.GetConnectionReturns(conn)
156155

157156
depStore := NewDeploymentStore(connTracker)
158-
depStore.GetOrStore(deploymentName, &broadcastfakes.FakeBroadcaster{})
157+
depStore.GetOrStore(deploymentName, nil)
159158

160159
fs := newFileService(logr.Discard(), depStore, connTracker)
161160

internal/mode/static/nginx/agent/grpc/connections.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
type ConnectionsTracker interface {
1616
Track(key string, conn Connection)
1717
GetConnection(key string) Connection
18-
Ready(key string) (Connection, bool)
1918
SetInstanceID(key, id string)
2019
UntrackConnectionsForParent(parent types.NamespacedName)
2120
}
@@ -27,6 +26,12 @@ type Connection struct {
2726
Parent types.NamespacedName
2827
}
2928

29+
// Ready returns if the connection is ready to be used. In other words, agent
30+
// has registered itself and an nginx instance with the control plane.
31+
func (c *Connection) Ready() bool {
32+
return c.InstanceID != ""
33+
}
34+
3035
// AgentConnectionsTracker keeps track of all connections between the control plane and nginx agents.
3136
type AgentConnectionsTracker struct {
3237
// connections contains a map of all IP addresses that have connected and their connection info.
@@ -61,16 +66,6 @@ func (c *AgentConnectionsTracker) GetConnection(key string) Connection {
6166
return c.connections[key]
6267
}
6368

64-
// ConnectionIsReady returns if the connection is ready to be used. In other words, agent
65-
// has registered itself and an nginx instance with the control plane.
66-
func (c *AgentConnectionsTracker) Ready(key string) (Connection, bool) {
67-
c.lock.RLock()
68-
defer c.lock.RUnlock()
69-
70-
conn, ok := c.connections[key]
71-
return conn, ok && conn.InstanceID != ""
72-
}
73-
7469
// SetInstanceID sets the nginx instanceID for a connection.
7570
func (c *AgentConnectionsTracker) SetInstanceID(key, id string) {
7671
c.lock.Lock()

internal/mode/static/nginx/agent/grpc/connections_test.go

+7-16
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,29 @@ func TestGetConnection(t *testing.T) {
2929
g.Expect(nonExistent).To(Equal(agentgrpc.Connection{}))
3030
}
3131

32-
func TestReady(t *testing.T) {
32+
func TestConnectionIsReady(t *testing.T) {
3333
t.Parallel()
3434
g := NewWithT(t)
3535

36-
tracker := agentgrpc.NewConnectionsTracker()
37-
3836
conn := agentgrpc.Connection{
3937
PodName: "pod1",
4038
InstanceID: "instance1",
4139
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
4240
}
43-
tracker.Track("key1", conn)
4441

45-
trackedConn, ready := tracker.Ready("key1")
46-
g.Expect(ready).To(BeTrue())
47-
g.Expect(trackedConn).To(Equal(conn))
42+
g.Expect(conn.Ready()).To(BeTrue())
4843
}
4944

5045
func TestConnectionIsNotReady(t *testing.T) {
5146
t.Parallel()
5247
g := NewWithT(t)
5348

54-
tracker := agentgrpc.NewConnectionsTracker()
55-
5649
conn := agentgrpc.Connection{
5750
PodName: "pod1",
5851
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
5952
}
60-
tracker.Track("key1", conn)
6153

62-
_, ready := tracker.Ready("key1")
63-
g.Expect(ready).To(BeFalse())
54+
g.Expect(conn.Ready()).To(BeFalse())
6455
}
6556

6657
func TestSetInstanceID(t *testing.T) {
@@ -74,13 +65,13 @@ func TestSetInstanceID(t *testing.T) {
7465
}
7566
tracker.Track("key1", conn)
7667

77-
_, ready := tracker.Ready("key1")
78-
g.Expect(ready).To(BeFalse())
68+
trackedConn := tracker.GetConnection("key1")
69+
g.Expect(trackedConn.Ready()).To(BeFalse())
7970

8071
tracker.SetInstanceID("key1", "instance1")
8172

82-
trackedConn, ready := tracker.Ready("key1")
83-
g.Expect(ready).To(BeTrue())
73+
trackedConn = tracker.GetConnection("key1")
74+
g.Expect(trackedConn.Ready()).To(BeTrue())
8475
g.Expect(trackedConn.InstanceID).To(Equal("instance1"))
8576
}
8677

internal/mode/static/nginx/agent/grpc/grpcfakes/fake_connections_tracker.go

-79
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)