Skip to content

Commit 99bdb94

Browse files
GODRIVER-2810 Switch to polling monitoring when running within a FaaS environment (#1376)
1 parent 53450c7 commit 99bdb94

File tree

24 files changed

+1390
-213
lines changed

24 files changed

+1390
-213
lines changed

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ RUN export DEBIAN_FRONTEND=noninteractive && \
3131
software-properties-common \
3232
gpg \
3333
apt-utils \
34+
libc6-dev \
35+
gcc \
3436
make && \
3537
sudo update-ca-certificates && \
3638
rm -rf /var/lib/apt/lists/*

internal/driverutil/hello.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (C) MongoDB, Inc. 2023-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package driverutil
8+
9+
import (
10+
"os"
11+
"strings"
12+
)
13+
14+
const AwsLambdaPrefix = "AWS_Lambda_"
15+
16+
const (
17+
// FaaS environment variable names
18+
19+
// EnvVarAWSExecutionEnv is the AWS Execution environment variable.
20+
EnvVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
21+
// EnvVarAWSLambdaRuntimeAPI is the AWS Lambda runtime API variable.
22+
EnvVarAWSLambdaRuntimeAPI = "AWS_LAMBDA_RUNTIME_API"
23+
// EnvVarFunctionsWorkerRuntime is the functions worker runtime variable.
24+
EnvVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
25+
// EnvVarKService is the K Service variable.
26+
EnvVarKService = "K_SERVICE"
27+
// EnvVarFunctionName is the function name variable.
28+
EnvVarFunctionName = "FUNCTION_NAME"
29+
// EnvVarVercel is the Vercel variable.
30+
EnvVarVercel = "VERCEL"
31+
// EnvVarK8s is the K8s veriable.
32+
EnvVarK8s = "KUBERNETES_SERVICE_HOST"
33+
)
34+
35+
const (
36+
// FaaS environment variable names
37+
38+
// EnvVarAWSRegion is the AWS region variable.
39+
EnvVarAWSRegion = "AWS_REGION"
40+
// EnvVarAWSLambdaFunctionMemorySize is the AWS Lambda function memory size variable.
41+
EnvVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
42+
// EnvVarFunctionMemoryMB is the function memory in megabytes variable.
43+
EnvVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
44+
// EnvVarFunctionTimeoutSec is the function timeout in seconds variable.
45+
EnvVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
46+
// EnvVarFunctionRegion is the function region variable.
47+
EnvVarFunctionRegion = "FUNCTION_REGION"
48+
// EnvVarVercelRegion is the Vercel region variable.
49+
EnvVarVercelRegion = "VERCEL_REGION"
50+
)
51+
52+
const (
53+
// FaaS environment names used by the client
54+
55+
// EnvNameAWSLambda is the AWS Lambda environment name.
56+
EnvNameAWSLambda = "aws.lambda"
57+
// EnvNameAzureFunc is the Azure Function environment name.
58+
EnvNameAzureFunc = "azure.func"
59+
// EnvNameGCPFunc is the Google Cloud Function environment name.
60+
EnvNameGCPFunc = "gcp.func"
61+
// EnvNameVercel is the Vercel environment name.
62+
EnvNameVercel = "vercel"
63+
)
64+
65+
// GetFaasEnvName parses the FaaS environment variable name and returns the
66+
// corresponding name used by the client. If none of the variables or variables
67+
// for multiple names are populated the client.env value MUST be entirely
68+
// omitted. When variables for multiple "client.env.name" values are present,
69+
// "vercel" takes precedence over "aws.lambda"; any other combination MUST cause
70+
// "client.env" to be entirely omitted.
71+
func GetFaasEnvName() string {
72+
envVars := []string{
73+
EnvVarAWSExecutionEnv,
74+
EnvVarAWSLambdaRuntimeAPI,
75+
EnvVarFunctionsWorkerRuntime,
76+
EnvVarKService,
77+
EnvVarFunctionName,
78+
EnvVarVercel,
79+
}
80+
81+
// If none of the variables are populated the client.env value MUST be
82+
// entirely omitted.
83+
names := make(map[string]struct{})
84+
85+
for _, envVar := range envVars {
86+
val := os.Getenv(envVar)
87+
if val == "" {
88+
continue
89+
}
90+
91+
var name string
92+
93+
switch envVar {
94+
case EnvVarAWSExecutionEnv:
95+
if !strings.HasPrefix(val, AwsLambdaPrefix) {
96+
continue
97+
}
98+
99+
name = EnvNameAWSLambda
100+
case EnvVarAWSLambdaRuntimeAPI:
101+
name = EnvNameAWSLambda
102+
case EnvVarFunctionsWorkerRuntime:
103+
name = EnvNameAzureFunc
104+
case EnvVarKService, EnvVarFunctionName:
105+
name = EnvNameGCPFunc
106+
case EnvVarVercel:
107+
// "vercel" takes precedence over "aws.lambda".
108+
delete(names, EnvNameAWSLambda)
109+
110+
name = EnvNameVercel
111+
}
112+
113+
names[name] = struct{}{}
114+
if len(names) > 1 {
115+
// If multiple names are populated the client.env value
116+
// MUST be entirely omitted.
117+
names = nil
118+
119+
break
120+
}
121+
}
122+
123+
for name := range names {
124+
return name
125+
}
126+
127+
return ""
128+
}
File renamed without changes.

internal/test/faas/awslambda/mongodb/main.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ const timeout = 60 * time.Second
2727
// event durations, as well as the number of heartbeats, commands, and open
2828
// connections.
2929
type eventListener struct {
30-
commandCount int
31-
commandDuration int64
32-
heartbeatCount int
33-
heartbeatDuration int64
34-
openConnections int
30+
commandCount int
31+
commandDuration int64
32+
heartbeatAwaitedCount int
33+
heartbeatCount int
34+
heartbeatDuration int64
35+
openConnections int
3536
}
3637

3738
// commandMonitor initializes an event.CommandMonitor that will count the number
@@ -61,11 +62,19 @@ func (listener *eventListener) serverMonitor() *event.ServerMonitor {
6162
succeeded := func(e *event.ServerHeartbeatSucceededEvent) {
6263
listener.heartbeatCount++
6364
listener.heartbeatDuration += e.DurationNanos
65+
66+
if e.Awaited {
67+
listener.heartbeatAwaitedCount++
68+
}
6469
}
6570

6671
failed := func(e *event.ServerHeartbeatFailedEvent) {
6772
listener.heartbeatCount++
6873
listener.heartbeatDuration += e.DurationNanos
74+
75+
if e.Awaited {
76+
listener.heartbeatAwaitedCount++
77+
}
6978
}
7079

7180
return &event.ServerMonitor{
@@ -150,6 +159,12 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
150159
return gateway500(), fmt.Errorf("failed to delete: %w", err)
151160
}
152161

162+
// Driver must switch to polling monitoring when running within a FaaS
163+
// environment.
164+
if listener.heartbeatAwaitedCount > 0 {
165+
return gateway500(), fmt.Errorf("FaaS environment fialed to switch to polling")
166+
}
167+
153168
var avgCmdDur float64
154169
if count := listener.commandCount; count != 0 {
155170
avgCmdDur = float64(listener.commandDuration) / float64(count)

mongo/integration/handshake_test.go

Lines changed: 42 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -55,31 +55,18 @@ func TestHandshakeProse(t *testing.T) {
5555
return elems
5656
}
5757

58-
const (
59-
envVarAWSExecutionEnv = "AWS_EXECUTION_ENV"
60-
envVarAWSRegion = "AWS_REGION"
61-
envVarAWSLambdaFunctionMemorySize = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE"
62-
envVarFunctionsWorkerRuntime = "FUNCTIONS_WORKER_RUNTIME"
63-
envVarKService = "K_SERVICE"
64-
envVarFunctionMemoryMB = "FUNCTION_MEMORY_MB"
65-
envVarFunctionTimeoutSec = "FUNCTION_TIMEOUT_SEC"
66-
envVarFunctionRegion = "FUNCTION_REGION"
67-
envVarVercel = "VERCEL"
68-
envVarVercelRegion = "VERCEL_REGION"
69-
)
70-
7158
// Reset the environment variables to avoid environment namespace
7259
// collision.
73-
t.Setenv(envVarAWSExecutionEnv, "")
74-
t.Setenv(envVarFunctionsWorkerRuntime, "")
75-
t.Setenv(envVarKService, "")
76-
t.Setenv(envVarVercel, "")
77-
t.Setenv(envVarAWSRegion, "")
78-
t.Setenv(envVarAWSLambdaFunctionMemorySize, "")
79-
t.Setenv(envVarFunctionMemoryMB, "")
80-
t.Setenv(envVarFunctionTimeoutSec, "")
81-
t.Setenv(envVarFunctionRegion, "")
82-
t.Setenv(envVarVercelRegion, "")
60+
t.Setenv("AWS_EXECUTION_ENV", "")
61+
t.Setenv("FUNCTIONS_WORKER_RUNTIME", "")
62+
t.Setenv("K_SERVICE", "")
63+
t.Setenv("VERCEL", "")
64+
t.Setenv("AWS_REGION", "")
65+
t.Setenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "")
66+
t.Setenv("FUNCTION_MEMORY_MB", "")
67+
t.Setenv("FUNCTION_TIMEOUT_SEC", "")
68+
t.Setenv("FUNCTION_REGION", "")
69+
t.Setenv("VERCEL_REGION", "")
8370

8471
for _, test := range []struct {
8572
name string
@@ -89,9 +76,9 @@ func TestHandshakeProse(t *testing.T) {
8976
{
9077
name: "1. valid AWS",
9178
env: map[string]string{
92-
envVarAWSExecutionEnv: "AWS_Lambda_java8",
93-
envVarAWSRegion: "us-east-2",
94-
envVarAWSLambdaFunctionMemorySize: "1024",
79+
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
80+
"AWS_REGION": "us-east-2",
81+
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "1024",
9582
},
9683
want: clientMetadata(bson.D{
9784
{Key: "name", Value: "aws.lambda"},
@@ -102,7 +89,7 @@ func TestHandshakeProse(t *testing.T) {
10289
{
10390
name: "2. valid Azure",
10491
env: map[string]string{
105-
envVarFunctionsWorkerRuntime: "node",
92+
"FUNCTIONS_WORKER_RUNTIME": "node",
10693
},
10794
want: clientMetadata(bson.D{
10895
{Key: "name", Value: "azure.func"},
@@ -111,10 +98,10 @@ func TestHandshakeProse(t *testing.T) {
11198
{
11299
name: "3. valid GCP",
113100
env: map[string]string{
114-
envVarKService: "servicename",
115-
envVarFunctionMemoryMB: "1024",
116-
envVarFunctionTimeoutSec: "60",
117-
envVarFunctionRegion: "us-central1",
101+
"K_SERVICE": "servicename",
102+
"FUNCTION_MEMORY_MB": "1024",
103+
"FUNCTION_TIMEOUT_SEC": "60",
104+
"FUNCTION_REGION": "us-central1",
118105
},
119106
want: clientMetadata(bson.D{
120107
{Key: "name", Value: "gcp.func"},
@@ -126,8 +113,8 @@ func TestHandshakeProse(t *testing.T) {
126113
{
127114
name: "4. valid Vercel",
128115
env: map[string]string{
129-
envVarVercel: "1",
130-
envVarVercelRegion: "cdg1",
116+
"VERCEL": "1",
117+
"VERCEL_REGION": "cdg1",
131118
},
132119
want: clientMetadata(bson.D{
133120
{Key: "name", Value: "vercel"},
@@ -137,16 +124,16 @@ func TestHandshakeProse(t *testing.T) {
137124
{
138125
name: "5. invalid multiple providers",
139126
env: map[string]string{
140-
envVarAWSExecutionEnv: "AWS_Lambda_java8",
141-
envVarFunctionsWorkerRuntime: "node",
127+
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
128+
"FUNCTIONS_WORKER_RUNTIME": "node",
142129
},
143130
want: clientMetadata(nil),
144131
},
145132
{
146133
name: "6. invalid long string",
147134
env: map[string]string{
148-
envVarAWSExecutionEnv: "AWS_Lambda_java8",
149-
envVarAWSRegion: func() string {
135+
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
136+
"AWS_REGION": func() string {
150137
var s string
151138
for i := 0; i < 512; i++ {
152139
s += "a"
@@ -161,8 +148,8 @@ func TestHandshakeProse(t *testing.T) {
161148
{
162149
name: "7. invalid wrong types",
163150
env: map[string]string{
164-
envVarAWSExecutionEnv: "AWS_Lambda_java8",
165-
envVarAWSLambdaFunctionMemorySize: "big",
151+
"AWS_EXECUTION_ENV": "AWS_Lambda_java8",
152+
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": "big",
166153
},
167154
want: clientMetadata(bson.D{
168155
{Key: "name", Value: "aws.lambda"},
@@ -171,7 +158,7 @@ func TestHandshakeProse(t *testing.T) {
171158
{
172159
name: "8. Invalid - AWS_EXECUTION_ENV does not start with \"AWS_Lambda_\"",
173160
env: map[string]string{
174-
envVarAWSExecutionEnv: "EC2",
161+
"AWS_EXECUTION_ENV": "EC2",
175162
},
176163
want: clientMetadata(nil),
177164
},
@@ -188,32 +175,27 @@ func TestHandshakeProse(t *testing.T) {
188175
require.NoError(mt, err, "Ping error: %v", err)
189176

190177
messages := mt.GetProxiedMessages()
178+
handshakeMessage := messages[:1][0]
191179

192-
// First two messages are handshake messages
193-
for idx, pair := range messages[:2] {
194-
hello := handshake.LegacyHello
195-
// Expect "hello" command name with API version.
196-
if os.Getenv("REQUIRE_API_VERSION") == "true" {
197-
hello = "hello"
198-
}
199-
200-
assert.Equal(mt, pair.CommandName, hello, "expected and actual command name at index %d are different", idx)
180+
hello := handshake.LegacyHello
181+
if os.Getenv("REQUIRE_API_VERSION") == "true" {
182+
hello = "hello"
183+
}
201184

202-
sent := pair.Sent
185+
assert.Equal(mt, hello, handshakeMessage.CommandName)
203186

204-
// Lookup the "client" field in the command document.
205-
clientVal, err := sent.Command.LookupErr("client")
206-
require.NoError(mt, err, "expected command %s at index %d to contain client field", sent.Command, idx)
187+
// Lookup the "client" field in the command document.
188+
clientVal, err := handshakeMessage.Sent.Command.LookupErr("client")
189+
require.NoError(mt, err, "expected command %s to contain client field", handshakeMessage.Sent.Command)
207190

208-
got, ok := clientVal.DocumentOK()
209-
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
191+
got, ok := clientVal.DocumentOK()
192+
require.True(mt, ok, "expected client field to be a document, got %s", clientVal.Type)
210193

211-
wantBytes, err := bson.Marshal(test.want)
212-
require.NoError(mt, err, "error marshaling want document: %v", err)
194+
wantBytes, err := bson.Marshal(test.want)
195+
require.NoError(mt, err, "error marshaling want document: %v", err)
213196

214-
want := bsoncore.Document(wantBytes)
215-
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
216-
}
197+
want := bsoncore.Document(wantBytes)
198+
assert.Equal(mt, want, got, "want: %v, got: %v", want, got)
217199
})
218200
}
219201
}

mongo/integration/sdam_prose_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ func TestSDAMProse(t *testing.T) {
3232
heartbeatIntervalMtOpts := mtest.NewOptions().
3333
ClientOptions(heartbeatIntervalClientOpts).
3434
CreateCollection(false).
35-
ClientType(mtest.Proxy)
35+
ClientType(mtest.Proxy).
36+
MinServerVersion("4.4") // RTT Monitor / Streaming protocol is not supported for versions < 4.4.
3637
mt.RunOpts("heartbeats processed more frequently", heartbeatIntervalMtOpts, func(mt *mtest.T) {
3738
// Test that setting heartbeat interval to 500ms causes the client to process heartbeats
3839
// approximately every 500ms instead of the default 10s. Note that a Client doesn't

mongo/integration/unified/client_entity.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
599599
clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond)
600600
case "serverselectiontimeoutms":
601601
clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond)
602+
case "servermonitoringmode":
603+
clientOpts.SetServerMonitoringMode(value.(string))
602604
default:
603605
return fmt.Errorf("unrecognized URI option %s", key)
604606
}

0 commit comments

Comments
 (0)