1
1
package dev .openfeature .contrib .providers .flagd ;
2
2
3
3
import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
4
- import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
4
+ import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
5
+ import dev .openfeature .contrib .providers .flagd .resolver .common .Util ;
5
6
import dev .openfeature .contrib .providers .flagd .resolver .grpc .GrpcResolver ;
6
7
import dev .openfeature .contrib .providers .flagd .resolver .grpc .cache .Cache ;
7
8
import dev .openfeature .contrib .providers .flagd .resolver .process .InProcessResolver ;
12
13
import dev .openfeature .sdk .ImmutableStructure ;
13
14
import dev .openfeature .sdk .Metadata ;
14
15
import dev .openfeature .sdk .ProviderEvaluation ;
16
+ import dev .openfeature .sdk .ProviderEvent ;
15
17
import dev .openfeature .sdk .ProviderEventDetails ;
16
18
import dev .openfeature .sdk .Structure ;
17
19
import dev .openfeature .sdk .Value ;
18
20
import java .util .ArrayList ;
19
21
import java .util .Collections ;
20
22
import java .util .List ;
23
+ import java .util .concurrent .Executors ;
24
+ import java .util .concurrent .ScheduledExecutorService ;
25
+ import java .util .concurrent .ScheduledFuture ;
26
+ import java .util .concurrent .TimeUnit ;
21
27
import java .util .function .Function ;
22
28
import lombok .extern .slf4j .Slf4j ;
23
29
@@ -31,10 +37,29 @@ public class FlagdProvider extends EventProvider {
31
37
private static final String FLAGD_PROVIDER = "flagd" ;
32
38
private final Resolver flagResolver ;
33
39
private volatile boolean initialized = false ;
34
- private volatile boolean connected = false ;
35
40
private volatile Structure syncMetadata = new ImmutableStructure ();
36
41
private volatile EvaluationContext enrichedContext = new ImmutableContext ();
37
42
private final List <Hook > hooks = new ArrayList <>();
43
+ private volatile ProviderEvent previousEvent = null ;
44
+
45
+ /**
46
+ * An executor service responsible for scheduling reconnection attempts.
47
+ */
48
+ private final ScheduledExecutorService reconnectExecutor ;
49
+
50
+ /**
51
+ * A scheduled task for managing reconnection attempts.
52
+ */
53
+ private ScheduledFuture <?> reconnectTask ;
54
+
55
+ /**
56
+ * The grace period in milliseconds to wait for reconnection before emitting an error event.
57
+ */
58
+ private final long gracePeriod ;
59
+ /**
60
+ * The deadline in milliseconds for GRPC operations.
61
+ */
62
+ private final long deadline ;
38
63
39
64
protected final void finalize () {
40
65
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
@@ -55,18 +80,21 @@ public FlagdProvider() {
55
80
public FlagdProvider (final FlagdOptions options ) {
56
81
switch (options .getResolverType ().asString ()) {
57
82
case Config .RESOLVER_IN_PROCESS :
58
- this .flagResolver = new InProcessResolver (options , this ::isConnected , this :: onConnectionEvent );
83
+ this .flagResolver = new InProcessResolver (options , this ::onProviderEvent );
59
84
break ;
60
85
case Config .RESOLVER_RPC :
61
86
this .flagResolver = new GrpcResolver (
62
- options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onConnectionEvent );
87
+ options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onProviderEvent );
63
88
break ;
64
89
default :
65
90
throw new IllegalStateException (
66
91
String .format ("Requested unsupported resolver type of %s" , options .getResolverType ()));
67
92
}
68
93
hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
69
94
contextEnricher = options .getContextEnricher ();
95
+ this .reconnectExecutor = Executors .newSingleThreadScheduledExecutor ();
96
+ this .gracePeriod = options .getRetryGracePeriod ();
97
+ this .deadline = options .getDeadline ();
70
98
}
71
99
72
100
@ Override
@@ -81,17 +109,22 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
81
109
}
82
110
83
111
this .flagResolver .init ();
84
- this .initialized = this .connected = true ;
112
+ // block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing
113
+ // into the equation
114
+ Util .busyWaitAndCheck (this .deadline + 1000 , () -> initialized );
85
115
}
86
116
87
117
@ Override
88
118
public synchronized void shutdown () {
89
119
if (!this .initialized ) {
90
120
return ;
91
121
}
92
-
93
122
try {
94
123
this .flagResolver .shutdown ();
124
+ if (reconnectExecutor != null ) {
125
+ reconnectExecutor .shutdownNow ();
126
+ reconnectExecutor .awaitTermination (deadline , TimeUnit .MILLISECONDS );
127
+ }
95
128
} catch (Exception e ) {
96
129
log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
97
130
} finally {
@@ -151,47 +184,73 @@ EvaluationContext getEnrichedContext() {
151
184
return enrichedContext ;
152
185
}
153
186
154
- private boolean isConnected () {
155
- return this .connected ;
156
- }
187
+ private void onProviderEvent (FlagdProviderEvent flagdProviderEvent ) {
157
188
158
- private void onConnectionEvent (ConnectionEvent connectionEvent ) {
159
- final boolean wasConnected = connected ;
160
- final boolean isConnected = connected = connectionEvent .isConnected ();
189
+ syncMetadata = flagdProviderEvent .getSyncMetadata ();
190
+ if (flagdProviderEvent .getSyncMetadata () != null ) {
191
+ enrichedContext = contextEnricher .apply (flagdProviderEvent .getSyncMetadata ());
192
+ }
161
193
162
- syncMetadata = connectionEvent .getSyncMetadata ();
163
- enrichedContext = contextEnricher .apply (connectionEvent .getSyncMetadata ());
194
+ switch (flagdProviderEvent .getEvent ()) {
195
+ case PROVIDER_CONFIGURATION_CHANGED :
196
+ if (previousEvent == ProviderEvent .PROVIDER_READY ) {
197
+ this .emitProviderConfigurationChanged (ProviderEventDetails .builder ()
198
+ .flagsChanged (flagdProviderEvent .getFlagsChanged ())
199
+ .message ("configuration changed" )
200
+ .build ());
201
+ break ;
202
+ }
203
+ case PROVIDER_READY :
204
+ onReady ();
205
+ previousEvent = ProviderEvent .PROVIDER_READY ;
206
+ break ;
164
207
165
- if (!initialized ) {
166
- return ;
208
+ case PROVIDER_ERROR :
209
+ if (previousEvent != ProviderEvent .PROVIDER_ERROR ) {
210
+ onError ();
211
+ }
212
+ previousEvent = ProviderEvent .PROVIDER_ERROR ;
213
+ break ;
167
214
}
215
+ }
168
216
169
- if (!wasConnected && isConnected ) {
170
- ProviderEventDetails details = ProviderEventDetails .builder ()
171
- .flagsChanged (connectionEvent .getFlagsChanged ())
172
- .message ("connected to flagd" )
173
- .build ();
174
- this .emitProviderReady (details );
175
- return ;
217
+ private void onReady () {
218
+ if (!initialized ) {
219
+ initialized = true ;
220
+ log .info ("initialized FlagdProvider" );
221
+ }
222
+ if (reconnectTask != null && !reconnectTask .isCancelled ()) {
223
+ reconnectTask .cancel (false );
224
+ log .debug ("Reconnection task cancelled as connection became READY." );
176
225
}
226
+ this .emitProviderReady (
227
+ ProviderEventDetails .builder ().message ("connected to flagd" ).build ());
228
+ }
177
229
178
- if (wasConnected && isConnected ) {
179
- ProviderEventDetails details = ProviderEventDetails .builder ()
180
- .flagsChanged (connectionEvent .getFlagsChanged ())
181
- .message ("configuration changed" )
182
- .build ();
183
- this .emitProviderConfigurationChanged (details );
184
- return ;
230
+ private void onError () {
231
+ log .info ("Connection lost. Emit STALE event..." );
232
+ log .debug ("Waiting {}s for connection to become available..." , gracePeriod );
233
+ this .emitProviderStale (ProviderEventDetails .builder ()
234
+ .message ("there has been an error" )
235
+ .build ());
236
+
237
+ if (reconnectTask != null && !reconnectTask .isCancelled ()) {
238
+ reconnectTask .cancel (false );
185
239
}
186
240
187
- if (connectionEvent .isStale ()) {
188
- this .emitProviderStale (ProviderEventDetails .builder ()
189
- .message ("there has been an error" )
190
- .build ());
191
- } else {
192
- this .emitProviderError (ProviderEventDetails .builder ()
193
- .message ("there has been an error" )
194
- .build ());
241
+ if (!reconnectExecutor .isShutdown ()) {
242
+ reconnectTask = reconnectExecutor .schedule (
243
+ () -> {
244
+ log .debug (
245
+ "Provider did not reconnect successfully within {}s. Emit ERROR event..." , gracePeriod );
246
+ flagResolver .onError ();
247
+ this .emitProviderError (ProviderEventDetails .builder ()
248
+ .message ("there has been an error" )
249
+ .build ());
250
+ ;
251
+ },
252
+ gracePeriod ,
253
+ TimeUnit .SECONDS );
195
254
}
196
255
}
197
256
}
0 commit comments