1
1
package dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .grpc ;
2
2
3
3
import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
4
- import dev .openfeature .contrib .providers .flagd .resolver .common .ChannelBuilder ;
5
- import dev .openfeature .contrib .providers .flagd .resolver .common . backoff . GrpcStreamConnectorBackoffService ;
4
+ import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
5
+ import dev .openfeature .contrib .providers .flagd .resolver .grpc . GrpcConnector ;
6
6
import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .Connector ;
7
7
import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayload ;
8
8
import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayloadType ;
9
9
import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc ;
10
- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceBlockingStub ;
11
- import dev .openfeature .flagd .grpc .sync .FlagSyncServiceGrpc .FlagSyncServiceStub ;
12
10
import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataRequest ;
13
11
import dev .openfeature .flagd .grpc .sync .Sync .GetMetadataResponse ;
14
12
import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsRequest ;
15
13
import dev .openfeature .flagd .grpc .sync .Sync .SyncFlagsResponse ;
16
14
import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
17
15
import io .grpc .Context ;
18
16
import io .grpc .Context .CancellableContext ;
19
- import io .grpc .ManagedChannel ;
20
17
import java .util .concurrent .BlockingQueue ;
21
18
import java .util .concurrent .LinkedBlockingQueue ;
22
19
import java .util .concurrent .TimeUnit ;
23
20
import java .util .concurrent .atomic .AtomicBoolean ;
21
+ import java .util .function .Consumer ;
24
22
import lombok .extern .slf4j .Slf4j ;
25
- import org .slf4j .event .Level ;
26
23
27
24
/**
28
25
* Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
@@ -36,42 +33,34 @@ public class GrpcStreamConnector implements Connector {
36
33
37
34
private final AtomicBoolean shutdown = new AtomicBoolean (false );
38
35
private final BlockingQueue <QueuePayload > blockingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
39
- private final ManagedChannel channel ;
40
- private final FlagSyncServiceStub serviceStub ;
41
- private final FlagSyncServiceBlockingStub serviceBlockingStub ;
42
36
private final int deadline ;
43
- private final int streamDeadlineMs ;
44
37
private final String selector ;
45
- private final int retryBackoffMillis ;
38
+ private final GrpcConnector <
39
+ FlagSyncServiceGrpc .FlagSyncServiceStub , FlagSyncServiceGrpc .FlagSyncServiceBlockingStub >
40
+ grpcConnector ;
41
+ private final LinkedBlockingQueue <GrpcResponseModel > streamReceiver ;
46
42
47
43
/**
48
- * Construct a new GrpcStreamConnector.
49
- *
50
- * @param options flagd options
44
+ * Creates a new GrpcStreamConnector responsible for observing the event stream.
51
45
*/
52
- public GrpcStreamConnector (final FlagdOptions options ) {
53
- channel = ChannelBuilder .nettyChannel (options );
54
- serviceStub = FlagSyncServiceGrpc .newStub (channel );
55
- serviceBlockingStub = FlagSyncServiceGrpc .newBlockingStub (channel );
46
+ public GrpcStreamConnector (final FlagdOptions options , Consumer <ConnectionEvent > onConnectionEvent ) {
56
47
deadline = options .getDeadline ();
57
- streamDeadlineMs = options .getStreamDeadlineMs ();
58
48
selector = options .getSelector ();
59
- retryBackoffMillis = options .getRetryBackoffMs ();
49
+ streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
50
+ grpcConnector = new GrpcConnector <>(
51
+ options ,
52
+ FlagSyncServiceGrpc ::newStub ,
53
+ FlagSyncServiceGrpc ::newBlockingStub ,
54
+ onConnectionEvent ,
55
+ stub -> stub .syncFlags (SyncFlagsRequest .getDefaultInstance (), new GrpcStreamHandler (streamReceiver )));
60
56
}
61
57
62
58
/** Initialize gRPC stream connector. */
63
- public void init () {
59
+ public void init () throws Exception {
60
+ grpcConnector .initialize ();
64
61
Thread listener = new Thread (() -> {
65
62
try {
66
- observeEventStream (
67
- blockingQueue ,
68
- shutdown ,
69
- serviceStub ,
70
- serviceBlockingStub ,
71
- selector ,
72
- deadline ,
73
- streamDeadlineMs ,
74
- retryBackoffMillis );
63
+ observeEventStream (blockingQueue , shutdown , selector , deadline );
75
64
} catch (InterruptedException e ) {
76
65
log .warn ("gRPC event stream interrupted, flag configurations are stale" , e );
77
66
Thread .currentThread ().interrupt ();
@@ -96,37 +85,17 @@ public void shutdown() throws InterruptedException {
96
85
if (shutdown .getAndSet (true )) {
97
86
return ;
98
87
}
99
-
100
- try {
101
- if (this .channel != null && !this .channel .isShutdown ()) {
102
- this .channel .shutdown ();
103
- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
104
- }
105
- } finally {
106
- if (this .channel != null && !this .channel .isShutdown ()) {
107
- this .channel .shutdownNow ();
108
- this .channel .awaitTermination (this .deadline , TimeUnit .MILLISECONDS );
109
- log .warn (String .format ("Unable to shut down channel by %d deadline" , this .deadline ));
110
- }
111
- }
88
+ this .grpcConnector .shutdown ();
112
89
}
113
90
114
91
/** Contains blocking calls, to be used concurrently. */
115
- static void observeEventStream (
92
+ void observeEventStream (
116
93
final BlockingQueue <QueuePayload > writeTo ,
117
94
final AtomicBoolean shutdown ,
118
- final FlagSyncServiceStub serviceStub ,
119
- final FlagSyncServiceBlockingStub serviceBlockingStub ,
120
95
final String selector ,
121
- final int deadline ,
122
- final int streamDeadlineMs ,
123
- int retryBackoffMillis )
96
+ final int deadline )
124
97
throws InterruptedException {
125
98
126
- final BlockingQueue <GrpcResponseModel > streamReceiver = new LinkedBlockingQueue <>(QUEUE_SIZE );
127
- final GrpcStreamConnectorBackoffService backoffService =
128
- new GrpcStreamConnectorBackoffService (retryBackoffMillis );
129
-
130
99
log .info ("Initializing sync stream observer" );
131
100
132
101
while (!shutdown .get ()) {
@@ -143,15 +112,10 @@ static void observeEventStream(
143
112
}
144
113
145
114
try (CancellableContext context = Context .current ().withCancellation ()) {
146
- FlagSyncServiceStub localServiceStub = serviceStub ;
147
- if (streamDeadlineMs > 0 ) {
148
- localServiceStub = localServiceStub .withDeadlineAfter (streamDeadlineMs , TimeUnit .MILLISECONDS );
149
- }
150
-
151
- localServiceStub .syncFlags (syncRequest .build (), new GrpcStreamHandler (streamReceiver ));
152
115
153
116
try {
154
- metadataResponse = serviceBlockingStub
117
+ metadataResponse = grpcConnector
118
+ .getResolver ()
155
119
.withDeadlineAfter (deadline , TimeUnit .MILLISECONDS )
156
120
.getMetadata (metadataRequest .build ());
157
121
} catch (Exception e ) {
@@ -164,27 +128,18 @@ static void observeEventStream(
164
128
165
129
while (!shutdown .get ()) {
166
130
final GrpcResponseModel response = streamReceiver .take ();
167
-
168
131
if (response .isComplete ()) {
169
132
log .info ("Sync stream completed" );
170
- // The stream is complete, this isn't really an error but we should try to
133
+ // The stream is complete, this isn't really an error, but we should try to
171
134
// reconnect
172
135
break ;
173
136
}
174
137
175
138
Throwable streamException = response .getError ();
176
139
if (streamException != null || metadataException != null ) {
177
- long retryDelay = backoffService .getCurrentBackoffMillis ();
178
-
179
- // if we are in silent recover mode, we should not expose the error to the client
180
- if (backoffService .shouldRetrySilently ()) {
181
- logExceptions (Level .INFO , streamException , metadataException , retryDelay );
182
- } else {
183
- logExceptions (Level .ERROR , streamException , metadataException , retryDelay );
184
- if (!writeTo .offer (new QueuePayload (
185
- QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
186
- log .error ("Failed to convey ERROR status, queue is full" );
187
- }
140
+ if (!writeTo .offer (new QueuePayload (
141
+ QueuePayloadType .ERROR , "Error from stream or metadata" , metadataResponse ))) {
142
+ log .error ("Failed to convey ERROR status, queue is full" );
188
143
}
189
144
190
145
// close the context to cancel the stream in case just the metadata call failed
@@ -199,34 +154,10 @@ static void observeEventStream(
199
154
if (!writeTo .offer (new QueuePayload (QueuePayloadType .DATA , data , metadataResponse ))) {
200
155
log .error ("Stream writing failed" );
201
156
}
202
-
203
- // reset backoff if we succeeded in a retry attempt
204
- backoffService .reset ();
205
157
}
206
158
}
207
-
208
- // check for shutdown and avoid sleep
209
- if (!shutdown .get ()) {
210
- log .debug ("Stream failed, retrying in {}ms" , backoffService .getCurrentBackoffMillis ());
211
- backoffService .waitUntilNextAttempt ();
212
- }
213
159
}
214
160
215
161
log .info ("Shutdown invoked, exiting event stream listener" );
216
162
}
217
-
218
- private static void logExceptions (
219
- Level logLevel , Throwable streamException , Exception metadataException , long retryDelay ) {
220
- if (streamException != null ) {
221
- log .atLevel (logLevel )
222
- .setCause (streamException )
223
- .log ("Error initializing stream, retrying in {}ms" , retryDelay );
224
- }
225
-
226
- if (metadataException != null ) {
227
- log .atLevel (logLevel )
228
- .setCause (metadataException )
229
- .log ("Error initializing metadata, retrying in {}ms" , retryDelay );
230
- }
231
- }
232
163
}
0 commit comments