Skip to content

Commit ac5896c

Browse files
committed
fixup: minor fixes, add e2e assertion
Signed-off-by: Todd Baert <[email protected]>
1 parent 48e0026 commit ac5896c

File tree

16 files changed

+139
-117
lines changed

16 files changed

+139
-117
lines changed

providers/flagd/lombok.config

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# This file is needed to avoid errors throw by findbugs when working with lombok.
2+
lombok.addSuppressWarnings = true
3+
lombok.addLombokGeneratedAnnotation = true
4+
config.stopBubbling = true
5+
lombok.extern.findbugs.addSuppressFBWarnings = true

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
55
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
66
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
7-
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
87
import dev.openfeature.sdk.EvaluationContext;
98
import dev.openfeature.sdk.EventProvider;
10-
import dev.openfeature.sdk.FeatureProvider;
119
import dev.openfeature.sdk.Metadata;
1210
import dev.openfeature.sdk.ProviderEvaluation;
1311
import dev.openfeature.sdk.ProviderEventDetails;
@@ -25,7 +23,7 @@
2523
*/
2624
@Slf4j
2725
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
28-
public class FlagdProvider extends EventProvider implements FeatureProvider {
26+
public class FlagdProvider extends EventProvider {
2927
private static final String FLAGD_PROVIDER = "flagD Provider";
3028

3129
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -173,8 +171,8 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
173171
// configuration changed
174172
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
175173
log.debug("Configuration changed");
176-
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
177-
details.setFlagsChanged(changedFlagKeys);
174+
ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys)
175+
.message("configuration changed").build();
178176
this.emitProviderConfigurationChanged(details);
179177
return;
180178
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@
88
import io.grpc.stub.StreamObserver;
99
import lombok.extern.slf4j.Slf4j;
1010

11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.List;
1114
import java.util.Map;
12-
import java.util.function.Consumer;
15+
import java.util.function.BiConsumer;
1316

1417
/**
1518
* EventStreamObserver handles events emitted by flagd.
1619
*/
1720
@Slf4j
1821
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
1922
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
20-
private final Consumer<ProviderState> stateConsumer;
23+
private final BiConsumer<ProviderState, List<String>> stateConsumer;
2124
private final Object sync;
2225
private final Cache cache;
2326

@@ -28,11 +31,11 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
2831
/**
2932
* Create a gRPC stream that get notified about flag changes.
3033
*
31-
* @param sync synchronization object from caller
32-
* @param cache cache to update
33-
* @param stateConsumer lambda to call for setting the state
34+
* @param sync synchronization object from caller
35+
* @param cache cache to update
36+
* @param stateConsumer lambda to call for setting the state
3437
*/
35-
EventStreamObserver(Object sync, Cache cache, Consumer<ProviderState> stateConsumer) {
38+
EventStreamObserver(Object sync, Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
3639
this.sync = sync;
3740
this.cache = cache;
3841
this.stateConsumer = stateConsumer;
@@ -58,7 +61,7 @@ public void onError(Throwable t) {
5861
if (this.cache.getEnabled()) {
5962
this.cache.clear();
6063
}
61-
this.stateConsumer.accept(ProviderState.ERROR);
64+
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());
6265

6366
// handle last call of this stream
6467
handleEndOfStream();
@@ -69,32 +72,38 @@ public void onCompleted() {
6972
if (this.cache.getEnabled()) {
7073
this.cache.clear();
7174
}
72-
this.stateConsumer.accept(ProviderState.ERROR);
75+
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());
7376

7477
// handle last call of this stream
7578
handleEndOfStream();
7679
}
7780

7881
private void handleConfigurationChangeEvent(EventStreamResponse value) {
79-
this.stateConsumer.accept(ProviderState.READY);
80-
if (!this.cache.getEnabled()) {
81-
return;
82-
}
82+
List<String> changedFlags = new ArrayList<>();
83+
boolean cachingEnabled = this.cache.getEnabled();
84+
8385
Map<String, Value> data = value.getData().getFieldsMap();
8486
Value flagsValue = data.get(FLAGS_KEY);
8587
if (flagsValue == null) {
86-
this.cache.clear();
87-
return;
88+
if (cachingEnabled) {
89+
this.cache.clear();
90+
}
91+
} else {
92+
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
93+
this.cache.getEnabled();
94+
for (String flagKey : flags.keySet()) {
95+
changedFlags.add(flagKey);
96+
if (cachingEnabled) {
97+
this.cache.remove(flagKey);
98+
}
99+
}
88100
}
89101

90-
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
91-
for (String flagKey : flags.keySet()) {
92-
this.cache.remove(flagKey);
93-
}
102+
this.stateConsumer.accept(ProviderState.READY, changedFlags);
94103
}
95104

96105
private void handleProviderReadyEvent() {
97-
this.stateConsumer.accept(ProviderState.READY);
106+
this.stateConsumer.accept(ProviderState.READY, Collections.emptyList());
98107
if (this.cache.getEnabled()) {
99108
this.cache.clear();
100109
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public class GrpcConnector {
5353
* @param cache cache to use.
5454
* @param stateConsumer lambda to call for setting the state.
5555
*/
56-
public GrpcConnector(final FlagdOptions options, final Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
56+
public GrpcConnector(final FlagdOptions options, final Cache cache,
57+
BiConsumer<ProviderState, List<String>> stateConsumer) {
5758
this.channel = ChannelBuilder.nettyChannel(options);
5859
this.serviceStub = ServiceGrpc.newStub(channel);
5960
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
@@ -81,7 +82,8 @@ public void initialize() throws Exception {
8182
/**
8283
* Shuts down all gRPC resources.
8384
*
84-
* @throws Exception is something goes wrong while terminating the communication.
85+
* @throws Exception is something goes wrong while terminating the
86+
* communication.
8587
*/
8688
public void shutdown() throws Exception {
8789
// first shutdown the event listener
@@ -115,21 +117,24 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
115117
}
116118

117119
/**
118-
* Event stream observer logic. This contains blocking mechanisms, hence must be run in a dedicated thread.
120+
* Event stream observer logic. This contains blocking mechanisms, hence must be
121+
* run in a dedicated thread.
119122
*/
120123
private void observeEventStream() {
121124
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
122-
final StreamObserver<EventStreamResponse> responseObserver =
123-
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
125+
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
126+
this::grpcStateConsumer);
124127
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
125128

126129
try {
127130
synchronized (sync) {
128131
sync.wait();
129132
}
130133
} catch (InterruptedException e) {
131-
// Interruptions are considered end calls for this observer, hence log and return
132-
// Note - this is the most common interruption when shutdown, hence the log level debug
134+
// Interruptions are considered end calls for this observer, hence log and
135+
// return
136+
// Note - this is the most common interruption when shutdown, hence the log
137+
// level debug
133138
log.debug("interruption while waiting for condition", e);
134139
Thread.currentThread().interrupt();
135140
}
@@ -141,17 +146,18 @@ private void observeEventStream() {
141146
try {
142147
Thread.sleep(this.eventStreamRetryBackoff);
143148
} catch (InterruptedException e) {
144-
// Interruptions are considered end calls for this observer, hence log and return
149+
// Interruptions are considered end calls for this observer, hence log and
150+
// return
145151
log.warn("interrupted while restarting gRPC Event Stream");
146152
Thread.currentThread().interrupt();
147153
}
148154
}
149155

150156
log.error("failed to connect to event stream, exhausted retries");
151-
this.grpcStateConsumer(ProviderState.ERROR);
157+
this.grpcStateConsumer(ProviderState.ERROR, null);
152158
}
153159

154-
private void grpcStateConsumer(final ProviderState state) {
160+
private void grpcStateConsumer(final ProviderState state, final List<String> changedFlags) {
155161
// check for readiness
156162
if (ProviderState.READY.equals(state)) {
157163
this.eventStreamAttempt = 1;
@@ -163,6 +169,6 @@ private void grpcStateConsumer(final ProviderState state) {
163169
}
164170

165171
// chain to initiator
166-
this.stateConsumer.accept(state, Collections.emptyList());
172+
this.stateConsumer.accept(state, changedFlags);
167173
}
168174
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.List;
77
import java.util.Map;
88
import java.util.function.BiConsumer;
9-
import java.util.function.Consumer;
109
import java.util.function.Function;
1110
import java.util.function.Supplier;
1211
import java.util.stream.Collectors;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
77
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
88
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
9-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO;
9+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange;
1010
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
1111
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector;
1212
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector;
@@ -26,8 +26,6 @@
2626
import java.util.List;
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828
import java.util.function.BiConsumer;
29-
import java.util.stream.Collectors;
30-
3129

3230
import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING;
3331

@@ -53,8 +51,8 @@ public InProcessResolver(FlagdOptions options, BiConsumer<ProviderState, List<St
5351
this.deadline = options.getDeadline();
5452
this.stateConsumer = stateConsumer;
5553
this.operator = new Operator();
56-
this.metadata = options.getSelector() == null ? null :
57-
ImmutableMetadata.builder()
54+
this.metadata = options.getSelector() == null ? null
55+
: ImmutableMetadata.builder()
5856
.addString("scope", options.getSelector())
5957
.build();
6058
}
@@ -67,20 +65,21 @@ public void init() throws Exception {
6765
final Thread stateWatcher = new Thread(() -> {
6866
try {
6967
while (true) {
70-
final StorageStateDTO storageStateDTO = flagStore.getStateQueue().take();
71-
switch (storageStateDTO.getStorageState()) {
68+
final StorageStateChange storageStateChange = flagStore.getStateQueue().take();
69+
switch (storageStateChange.getStorageState()) {
7270
case OK:
73-
stateConsumer.accept(ProviderState.READY, storageStateDTO.getChangedFlagsKeys());
71+
stateConsumer.accept(ProviderState.READY, storageStateChange.getChangedFlagsKeys());
7472
this.connected.set(true);
7573
break;
7674
case ERROR:
77-
stateConsumer.accept(ProviderState.ERROR,null);
75+
stateConsumer.accept(ProviderState.ERROR, null);
7876
this.connected.set(false);
7977
break;
8078
case STALE:
8179
// todo set stale state
8280
default:
83-
log.info(String.format("Storage emitted unhandled status: %s", storageStateDTO.getStorageState()));
81+
log.info(String.format("Storage emitted unhandled status: %s",
82+
storageStateChange.getStorageState()));
8483
}
8584
}
8685
} catch (InterruptedException e) {
@@ -103,14 +102,14 @@ public void init() throws Exception {
103102
public void shutdown() throws InterruptedException {
104103
flagStore.shutdown();
105104
this.connected.set(false);
106-
stateConsumer.accept(ProviderState.NOT_READY,null);
105+
stateConsumer.accept(ProviderState.NOT_READY, null);
107106
}
108107

109108
/**
110109
* Resolve a boolean flag.
111110
*/
112111
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue,
113-
EvaluationContext ctx) {
112+
EvaluationContext ctx) {
114113
return resolve(Boolean.class, key, ctx);
115114
}
116115

@@ -221,7 +220,7 @@ private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationC
221220
.variant(resolvedVariant)
222221
.reason(reason);
223222

224-
return this.metadata == null ? evaluationBuilder.build() :
225-
evaluationBuilder.flagMetadata(this.metadata).build();
223+
return this.metadata == null ? evaluationBuilder.build()
224+
: evaluationBuilder.flagMetadata(this.metadata).build();
226225
}
227226
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
* Feature flag storage.
2222
*/
2323
@Slf4j
24-
@SuppressFBWarnings(value = {"EI_EXPOSE_REP"},
25-
justification = "Feature flag comes as a Json configuration, hence they must be exposed")
24+
@SuppressFBWarnings(value = {
25+
"EI_EXPOSE_REP" }, justification = "Feature flag comes as a Json configuration, hence they must be exposed")
2626
public class FlagStore implements Storage {
2727
private final ReentrantReadWriteLock sync = new ReentrantReadWriteLock();
2828
private final ReadLock readLock = sync.readLock();
2929
private final WriteLock writeLock = sync.writeLock();
3030

3131
private final AtomicBoolean shutdown = new AtomicBoolean(false);
32-
private final BlockingQueue<StorageStateDTO> stateBlockingQueue = new LinkedBlockingQueue<>(1);
32+
private final BlockingQueue<StorageStateChange> stateBlockingQueue = new LinkedBlockingQueue<>(1);
3333
private final Map<String, FeatureFlag> flags = new HashMap<>();
3434

3535
private final Connector connector;
@@ -89,7 +89,7 @@ public FeatureFlag getFlag(final String key) {
8989
/**
9090
* Retrieve blocking queue to check storage status.
9191
*/
92-
public BlockingQueue<StorageStateDTO> getStateQueue() {
92+
public BlockingQueue<StorageStateChange> getStateQueue() {
9393
return stateBlockingQueue;
9494
}
9595

@@ -111,19 +111,19 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
111111
} finally {
112112
writeLock.unlock();
113113
}
114-
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.OK, changedFlagsKeys))) {
114+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.OK, changedFlagsKeys))) {
115115
log.warn("Failed to convey OK satus, queue is full");
116116
}
117117
} catch (Throwable e) {
118118
// catch all exceptions and avoid stream listener interruptions
119119
log.warn("Invalid flag sync payload from connector", e);
120-
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.STALE))) {
120+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
121121
log.warn("Failed to convey STALE satus, queue is full");
122122
}
123123
}
124124
break;
125125
case ERROR:
126-
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.ERROR))) {
126+
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
127127
log.warn("Failed to convey ERROR satus, queue is full");
128128
}
129129
break;
@@ -147,8 +147,8 @@ private List<String> getChangedFlagsKeys(Map<String, FeatureFlag> newFlags) {
147147
updatedFeatureFlags.put(key, value);
148148
}
149149
});
150-
flags.forEach((key,value) -> {
151-
if(!newFlags.containsKey(key)) {
150+
flags.forEach((key, value) -> {
151+
if (!newFlags.containsKey(key)) {
152152
removedFeatureFlags.put(key, value);
153153
}
154154
});

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ public interface Storage {
1414

1515
FeatureFlag getFlag(final String key);
1616

17-
BlockingQueue<StorageStateDTO> getStateQueue();
17+
BlockingQueue<StorageStateChange> getStateQueue();
1818
}

0 commit comments

Comments
 (0)