Skip to content

Commit 4273ec3

Browse files
committed
CP/DP Split: track agent connections (#2970)
Added the following: - middleware to extract IP address of agent and store it in the grpc context - link the agent's hostname to its IP address when connecting and track it - use this linkage to pause the Subscription until the agent registers itself, then proceeding This logic is subject to change as we enhance this (like tracking auth token instead of IP address).
1 parent 895bec7 commit 4273ec3

File tree

12 files changed

+392
-115
lines changed

12 files changed

+392
-115
lines changed

internal/mode/static/manager.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/licensing"
5252
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/metrics/collectors"
5353
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent"
54+
agentgrpc "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
5455
ngxcfg "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/config"
5556
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/config/policies"
5657
"github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/config/policies/clientsettings"
@@ -182,14 +183,14 @@ func StartManager(cfg config.Config) error {
182183

183184
nginxUpdater := agent.NewNginxUpdater(cfg.Logger.WithName("nginxUpdater"), cfg.Plus)
184185

185-
grpcServer := &agent.GRPCServer{
186-
Logger: cfg.Logger.WithName("agentGRPCServer"),
187-
RegisterServices: []func(*grpc.Server){
186+
grpcServer := agentgrpc.NewServer(
187+
cfg.Logger.WithName("agentGRPCServer"),
188+
grpcServerPort,
189+
[]func(*grpc.Server){
188190
nginxUpdater.CommandService.Register,
189191
nginxUpdater.FileService.Register,
190192
},
191-
Port: grpcServerPort,
192-
}
193+
)
193194

194195
if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: grpcServer}); err != nil {
195196
return fmt.Errorf("cannot register grpc server: %w", err)

internal/mode/static/nginx/agent/agent.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,31 @@ type NginxUpdater interface {
1818
type NginxUpdaterImpl struct {
1919
CommandService *commandService
2020
FileService *fileService
21-
Logger logr.Logger
22-
Plus bool
21+
logger logr.Logger
22+
plus bool
2323
}
2424

25+
// NewNginxUpdater returns a new NginxUpdaterImpl instance.
2526
func NewNginxUpdater(logger logr.Logger, plus bool) *NginxUpdaterImpl {
2627
return &NginxUpdaterImpl{
27-
Logger: logger,
28-
Plus: plus,
29-
CommandService: newCommandService(),
30-
FileService: newFileService(),
28+
logger: logger,
29+
plus: plus,
30+
CommandService: newCommandService(logger.WithName("commandService")),
31+
FileService: newFileService(logger.WithName("fileService")),
3132
}
3233
}
3334

3435
// UpdateConfig sends the nginx configuration to the agent.
3536
func (n *NginxUpdaterImpl) UpdateConfig(files int) {
36-
n.Logger.Info("Sending nginx configuration to agent", "numFiles", files)
37+
n.logger.Info("Sending nginx configuration to agent", "numFiles", files)
3738
}
3839

3940
// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
4041
// Only applicable when using NGINX Plus.
4142
func (n *NginxUpdaterImpl) UpdateUpstreamServers() {
42-
if !n.Plus {
43+
if !n.plus {
4344
return
4445
}
4546

46-
n.Logger.Info("Updating upstream servers using NGINX Plus API")
47+
n.logger.Info("Updating upstream servers using NGINX Plus API")
4748
}

internal/mode/static/nginx/agent/command.go

+94-26
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,51 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/go-logr/logr"
910
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
1011
"google.golang.org/grpc"
12+
13+
agentgrpc "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
14+
grpcContext "github.com/nginx/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
1115
)
1216

13-
// commandService handles the connection and subscription to the agent.
17+
// commandService handles the connection and subscription to the data plane agent.
1418
type commandService struct {
1519
pb.CommandServiceServer
20+
connTracker *agentgrpc.ConnectionsTracker
21+
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
22+
logger logr.Logger
1623
}
1724

18-
func newCommandService() *commandService {
19-
return &commandService{}
25+
func newCommandService(logger logr.Logger) *commandService {
26+
return &commandService{
27+
logger: logger,
28+
connTracker: agentgrpc.NewConnectionsTracker(),
29+
}
2030
}
2131

2232
func (cs *commandService) Register(server *grpc.Server) {
2333
pb.RegisterCommandServiceServer(server, cs)
2434
}
2535

36+
// CreateConnection registers a data plane agent with the control plane.
2637
func (cs *commandService) CreateConnection(
27-
_ context.Context,
38+
ctx context.Context,
2839
req *pb.CreateConnectionRequest,
2940
) (*pb.CreateConnectionResponse, error) {
3041
if req == nil {
3142
return nil, errors.New("empty connection request")
3243
}
3344

34-
fmt.Printf("Creating connection for nginx pod: %s\n", req.GetResource().GetContainerInfo().GetHostname())
45+
gi, ok := grpcContext.GrpcInfoFromContext(ctx)
46+
if !ok {
47+
return nil, agentgrpc.ErrStatusInvalidConnection
48+
}
49+
50+
podName := req.GetResource().GetContainerInfo().GetHostname()
51+
52+
cs.logger.Info(fmt.Sprintf("Creating connection for nginx pod: %s", podName))
53+
cs.connTracker.Track(gi.IPAddress, podName)
3554

3655
return &pb.CreateConnectionResponse{
3756
Response: &pb.CommandResponse{
@@ -40,50 +59,99 @@ func (cs *commandService) CreateConnection(
4059
}, nil
4160
}
4261

62+
// Subscribe is a decoupled communication mechanism between the data plane agent and control plane.
4363
func (cs *commandService) Subscribe(in pb.CommandService_SubscribeServer) error {
44-
fmt.Println("Received subscribe request")
45-
4664
ctx := in.Context()
4765

66+
gi, ok := grpcContext.GrpcInfoFromContext(ctx)
67+
if !ok {
68+
return agentgrpc.ErrStatusInvalidConnection
69+
}
70+
71+
cs.logger.Info(fmt.Sprintf("Received subscribe request from %q", gi.IPAddress))
72+
73+
go cs.listenForDataPlaneResponse(ctx, in)
74+
75+
// wait for the agent to report itself
76+
podName, err := cs.waitForConnection(ctx, gi)
77+
if err != nil {
78+
cs.logger.Error(err, "error waiting for connection")
79+
return err
80+
}
81+
82+
cs.logger.Info(fmt.Sprintf("Handling subscription for %s/%s", podName, gi.IPAddress))
4883
for {
4984
select {
5085
case <-ctx.Done():
5186
return ctx.Err()
5287
case <-time.After(1 * time.Minute):
5388
dummyRequest := &pb.ManagementPlaneRequest{
54-
Request: &pb.ManagementPlaneRequest_StatusRequest{
55-
StatusRequest: &pb.StatusRequest{},
89+
Request: &pb.ManagementPlaneRequest_HealthRequest{
90+
HealthRequest: &pb.HealthRequest{},
5691
},
5792
}
58-
if err := in.Send(dummyRequest); err != nil { // will likely need retry logic
59-
fmt.Printf("ERROR: %v\n", err)
93+
if err := in.Send(dummyRequest); err != nil { // TODO(sberman): will likely need retry logic
94+
cs.logger.Error(err, "error sending request to agent")
6095
}
6196
}
6297
}
6398
}
6499

65-
func (cs *commandService) UpdateDataPlaneStatus(
66-
_ context.Context,
67-
req *pb.UpdateDataPlaneStatusRequest,
68-
) (*pb.UpdateDataPlaneStatusResponse, error) {
69-
fmt.Println("Updating data plane status")
100+
// TODO(sberman): current issue: when control plane restarts, agent doesn't re-establish a CreateConnection call,
101+
// so this fails.
102+
func (cs *commandService) waitForConnection(ctx context.Context, gi grpcContext.GrpcInfo) (string, error) {
103+
var podName string
104+
ticker := time.NewTicker(time.Second)
105+
defer ticker.Stop()
70106

71-
if req == nil {
72-
return nil, errors.New("empty update data plane status request")
107+
timer := time.NewTimer(30 * time.Second)
108+
defer timer.Stop()
109+
110+
for {
111+
select {
112+
case <-ctx.Done():
113+
return "", ctx.Err()
114+
case <-timer.C:
115+
return "", errors.New("timed out waiting for agent connection")
116+
case <-ticker.C:
117+
if podName = cs.connTracker.GetConnection(gi.IPAddress); podName != "" {
118+
return podName, nil
119+
}
120+
}
73121
}
122+
}
74123

75-
return &pb.UpdateDataPlaneStatusResponse{}, nil
124+
func (cs *commandService) listenForDataPlaneResponse(ctx context.Context, in pb.CommandService_SubscribeServer) {
125+
for {
126+
select {
127+
case <-ctx.Done():
128+
return
129+
default:
130+
dataPlaneResponse, err := in.Recv()
131+
cs.logger.Info(fmt.Sprintf("Received data plane response: %v", dataPlaneResponse))
132+
if err != nil {
133+
cs.logger.Error(err, "failed to receive data plane response")
134+
return
135+
}
136+
}
137+
}
76138
}
77139

140+
// UpdateDataPlaneHealth includes full health information about the data plane as reported by the agent.
141+
// TODO(sberman): Is health monitoring the data planes something useful for us to do?
78142
func (cs *commandService) UpdateDataPlaneHealth(
79143
_ context.Context,
80-
req *pb.UpdateDataPlaneHealthRequest,
144+
_ *pb.UpdateDataPlaneHealthRequest,
81145
) (*pb.UpdateDataPlaneHealthResponse, error) {
82-
fmt.Println("Updating data plane health")
83-
84-
if req == nil {
85-
return nil, errors.New("empty update dataplane health request")
86-
}
87-
88146
return &pb.UpdateDataPlaneHealthResponse{}, nil
89147
}
148+
149+
// UpdateDataPlaneStatus is called by agent on startup and upon any change in agent metadata,
150+
// instance metadata, or configurations. Since directly changing nginx configuration on the instance
151+
// is not supported, this is a no-op for NGF.
152+
func (cs *commandService) UpdateDataPlaneStatus(
153+
_ context.Context,
154+
_ *pb.UpdateDataPlaneStatusRequest,
155+
) (*pb.UpdateDataPlaneStatusResponse, error) {
156+
return &pb.UpdateDataPlaneStatusResponse{}, nil
157+
}

internal/mode/static/nginx/agent/file.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,65 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/go-logr/logr"
78
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
89
"google.golang.org/grpc"
910
)
1011

1112
// fileService handles file management between the control plane and the agent.
1213
type fileService struct {
1314
pb.FileServiceServer
15+
// TODO(sberman): all logs are at Info level right now. Adjust appropriately.
16+
logger logr.Logger
1417
}
1518

16-
func newFileService() *fileService {
17-
return &fileService{}
19+
func newFileService(logger logr.Logger) *fileService {
20+
return &fileService{logger: logger}
1821
}
1922

2023
func (fs *fileService) Register(server *grpc.Server) {
2124
pb.RegisterFileServiceServer(server, fs)
2225
}
2326

27+
// GetOverview gets the overview of files for a particular configuration version of an instance.
28+
// Agent calls this if it's missing an overview when a ConfigApplyRequest is called by the control plane.
2429
func (fs *fileService) GetOverview(
2530
_ context.Context,
2631
_ *pb.GetOverviewRequest,
2732
) (*pb.GetOverviewResponse, error) {
28-
fmt.Println("Get overview request")
33+
fs.logger.Info("Get overview request")
2934

3035
return &pb.GetOverviewResponse{
3136
Overview: &pb.FileOverview{},
3237
}, nil
3338
}
3439

35-
func (fs *fileService) UpdateOverview(
36-
_ context.Context,
37-
_ *pb.UpdateOverviewRequest,
38-
) (*pb.UpdateOverviewResponse, error) {
39-
fmt.Println("Update overview request")
40-
41-
return &pb.UpdateOverviewResponse{}, nil
42-
}
43-
40+
// GetFile is called by the agent when it needs to download a file for a ConfigApplyRequest.
4441
func (fs *fileService) GetFile(
4542
_ context.Context,
4643
req *pb.GetFileRequest,
4744
) (*pb.GetFileResponse, error) {
4845
filename := req.GetFileMeta().GetName()
4946
hash := req.GetFileMeta().GetHash()
50-
fmt.Printf("Getting file: %s, %s\n", filename, hash)
47+
fs.logger.Info(fmt.Sprintf("Getting file: %s, %s", filename, hash))
5148

5249
return &pb.GetFileResponse{}, nil
5350
}
5451

52+
// UpdateOverview is called by agent on startup and whenever any files change on the instance.
53+
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
54+
func (fs *fileService) UpdateOverview(
55+
_ context.Context,
56+
_ *pb.UpdateOverviewRequest,
57+
) (*pb.UpdateOverviewResponse, error) {
58+
return &pb.UpdateOverviewResponse{}, nil
59+
}
60+
61+
// UpdateFile is called by agent whenever any files change on the instance.
62+
// Since directly changing nginx configuration on the instance is not supported, this is a no-op for NGF.
5563
func (fs *fileService) UpdateFile(
5664
_ context.Context,
57-
req *pb.UpdateFileRequest,
65+
_ *pb.UpdateFileRequest,
5866
) (*pb.UpdateFileResponse, error) {
59-
fmt.Println("Update file request for: ", req.GetFile().GetFileMeta().GetName())
60-
6167
return &pb.UpdateFileResponse{}, nil
6268
}

internal/mode/static/nginx/agent/grpc.go

-59
This file was deleted.

0 commit comments

Comments
 (0)