Skip to content

Commit 48e0026

Browse files
utkarshtoddbaert
utkarsh
authored andcommitted
fix:passing changed flags in configuration change event
Signed-off-by: utkarsh <[email protected]>
1 parent f53b898 commit 48e0026

File tree

13 files changed

+195
-82
lines changed

13 files changed

+195
-82
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
55
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
66
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
7+
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
78
import dev.openfeature.sdk.EvaluationContext;
89
import dev.openfeature.sdk.EventProvider;
910
import dev.openfeature.sdk.FeatureProvider;
@@ -14,6 +15,7 @@
1415
import dev.openfeature.sdk.Value;
1516
import lombok.extern.slf4j.Slf4j;
1617

18+
import java.util.List;
1719
import java.util.concurrent.locks.Lock;
1820
import java.util.concurrent.locks.ReadWriteLock;
1921
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -142,7 +144,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
142144
return clientCallCtx;
143145
}
144146

145-
private void setState(ProviderState newState) {
147+
private void setState(ProviderState newState, List<String> changedFlagsKeys) {
146148
ProviderState oldState;
147149
Lock l = this.lock.writeLock();
148150
try {
@@ -152,17 +154,17 @@ private void setState(ProviderState newState) {
152154
} finally {
153155
l.unlock();
154156
}
155-
this.handleStateTransition(oldState, newState);
157+
this.handleStateTransition(oldState, newState, changedFlagsKeys);
156158
}
157159

158-
private void handleStateTransition(ProviderState oldState, ProviderState newState) {
160+
private void handleStateTransition(ProviderState oldState, ProviderState newState, List<String> changedFlagKeys) {
159161
// we got initialized
160162
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
161163
// nothing to do, the SDK emits the events
162164
log.debug("Init completed");
163165
return;
164166
}
165-
// we got shutdown, not checking oldState as behavior remains the same for shutdown
167+
// we got shutdown, not checking oldState as behavior remains the same for shutdown
166168
if (ProviderState.NOT_READY.equals(newState)) {
167169
// nothing to do
168170
log.debug("shutdown completed");
@@ -172,6 +174,7 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
172174
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
173175
log.debug("Configuration changed");
174176
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
177+
details.setFlagsChanged(changedFlagKeys);
175178
this.emitProviderConfigurationChanged(details);
176179
return;
177180
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
22

3+
import java.util.Collections;
4+
import java.util.List;
35
import java.util.Random;
46
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.atomic.AtomicBoolean;
6-
import java.util.function.Consumer;
7-
8+
import java.util.function.BiConsumer;
89
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
910
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
1011
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
@@ -37,7 +38,7 @@ public class GrpcConnector {
3738
private final long deadline;
3839

3940
private final Cache cache;
40-
private final Consumer<ProviderState> stateConsumer;
41+
private final BiConsumer<ProviderState, List<String>> stateConsumer;
4142

4243
private int eventStreamAttempt = 1;
4344
private int eventStreamRetryBackoff;
@@ -52,7 +53,7 @@ public class GrpcConnector {
5253
* @param cache cache to use.
5354
* @param stateConsumer lambda to call for setting the state.
5455
*/
55-
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
56+
public GrpcConnector(final FlagdOptions options, final Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
5657
this.channel = ChannelBuilder.nettyChannel(options);
5758
this.serviceStub = ServiceGrpc.newStub(channel);
5859
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
@@ -100,7 +101,7 @@ public void shutdown() throws Exception {
100101
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
101102
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
102103
}
103-
this.stateConsumer.accept(ProviderState.NOT_READY);
104+
this.stateConsumer.accept(ProviderState.NOT_READY, Collections.emptyList());
104105
}
105106
}
106107

@@ -162,6 +163,6 @@ private void grpcStateConsumer(final ProviderState state) {
162163
}
163164

164165
// chain to initiator
165-
this.stateConsumer.accept(state);
166+
this.stateConsumer.accept(state, Collections.emptyList());
166167
}
167168
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.function.BiConsumer;
89
import java.util.function.Consumer;
910
import java.util.function.Function;
1011
import java.util.function.Supplier;
@@ -62,7 +63,7 @@ public final class GrpcResolver implements Resolver {
6263
* @param stateConsumer lambda to communicate back the state.
6364
*/
6465
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<ProviderState> stateSupplier,
65-
final Consumer<ProviderState> stateConsumer) {
66+
final BiConsumer<ProviderState,List<String>> stateConsumer) {
6667
this.cache = cache;
6768
this.stateSupplier = stateSupplier;
6869

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
77
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
88
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
9-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState;
9+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateDTO;
1010
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
1111
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector;
1212
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector;
@@ -23,8 +23,11 @@
2323
import dev.openfeature.sdk.exceptions.TypeMismatchError;
2424
import lombok.extern.slf4j.Slf4j;
2525

26+
import java.util.List;
2627
import java.util.concurrent.atomic.AtomicBoolean;
27-
import java.util.function.Consumer;
28+
import java.util.function.BiConsumer;
29+
import java.util.stream.Collectors;
30+
2831

2932
import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING;
3033

@@ -36,7 +39,7 @@
3639
@Slf4j
3740
public class InProcessResolver implements Resolver {
3841
private final Storage flagStore;
39-
private final Consumer<ProviderState> stateConsumer;
42+
private final BiConsumer<ProviderState, List<String>> stateConsumer;
4043
private final Operator operator;
4144
private final long deadline;
4245
private final ImmutableMetadata metadata;
@@ -45,7 +48,7 @@ public class InProcessResolver implements Resolver {
4548
/**
4649
* Initialize an in-process resolver.
4750
*/
48-
public InProcessResolver(FlagdOptions options, Consumer<ProviderState> stateConsumer) {
51+
public InProcessResolver(FlagdOptions options, BiConsumer<ProviderState, List<String>> stateConsumer) {
4952
this.flagStore = new FlagStore(getConnector(options));
5053
this.deadline = options.getDeadline();
5154
this.stateConsumer = stateConsumer;
@@ -64,20 +67,20 @@ public void init() throws Exception {
6467
final Thread stateWatcher = new Thread(() -> {
6568
try {
6669
while (true) {
67-
final StorageState storageState = flagStore.getStateQueue().take();
68-
switch (storageState) {
70+
final StorageStateDTO storageStateDTO = flagStore.getStateQueue().take();
71+
switch (storageStateDTO.getStorageState()) {
6972
case OK:
70-
stateConsumer.accept(ProviderState.READY);
73+
stateConsumer.accept(ProviderState.READY, storageStateDTO.getChangedFlagsKeys());
7174
this.connected.set(true);
7275
break;
7376
case ERROR:
74-
stateConsumer.accept(ProviderState.ERROR);
77+
stateConsumer.accept(ProviderState.ERROR,null);
7578
this.connected.set(false);
7679
break;
7780
case STALE:
7881
// todo set stale state
7982
default:
80-
log.info(String.format("Storage emitted unhandled status: %s", storageState));
83+
log.info(String.format("Storage emitted unhandled status: %s", storageStateDTO.getStorageState()));
8184
}
8285
}
8386
} catch (InterruptedException e) {
@@ -100,38 +103,35 @@ public void init() throws Exception {
100103
public void shutdown() throws InterruptedException {
101104
flagStore.shutdown();
102105
this.connected.set(false);
103-
stateConsumer.accept(ProviderState.NOT_READY);
106+
stateConsumer.accept(ProviderState.NOT_READY,null);
104107
}
105108

106109
/**
107110
* Resolve a boolean flag.
108111
*/
109112
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue,
110-
EvaluationContext ctx) {
113+
EvaluationContext ctx) {
111114
return resolve(Boolean.class, key, ctx);
112115
}
113116

114117
/**
115118
* Resolve a string flag.
116119
*/
117-
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue,
118-
EvaluationContext ctx) {
120+
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
119121
return resolve(String.class, key, ctx);
120122
}
121123

122124
/**
123125
* Resolve a double flag.
124126
*/
125-
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue,
126-
EvaluationContext ctx) {
127+
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
127128
return resolve(Double.class, key, ctx);
128129
}
129130

130131
/**
131132
* Resolve an integer flag.
132133
*/
133-
public ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue,
134-
EvaluationContext ctx) {
134+
public ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
135135
return resolve(Integer.class, key, ctx);
136136
}
137137

@@ -160,8 +160,7 @@ static Connector getConnector(final FlagdOptions options) {
160160
: new GrpcStreamConnector(options);
161161
}
162162

163-
private <T> ProviderEvaluation<T> resolve(Class<T> type, String key,
164-
EvaluationContext ctx) {
163+
private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationContext ctx) {
165164
final FeatureFlag flag = flagStore.getFlag(key);
166165

167166
// missing flag

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/model/FeatureFlag.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.fasterxml.jackson.annotation.JsonProperty;
66
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
8+
import lombok.EqualsAndHashCode;
89
import lombok.Getter;
910

1011
import java.util.Map;
@@ -16,6 +17,7 @@
1617
@SuppressFBWarnings(value = {"EI_EXPOSE_REP"},
1718
justification = "Feature flag comes as a Json configuration, hence they must be parsed and exposed")
1819
@JsonIgnoreProperties(ignoreUnknown = true)
20+
@EqualsAndHashCode
1921
public class FeatureFlag {
2022
public static final String EMPTY_TARGETING_STRING = "{}";
2123

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
88
import lombok.extern.slf4j.Slf4j;
9-
109
import java.util.HashMap;
10+
import java.util.List;
1111
import java.util.Map;
1212
import java.util.concurrent.BlockingQueue;
1313
import java.util.concurrent.LinkedBlockingQueue;
1414
import java.util.concurrent.atomic.AtomicBoolean;
1515
import java.util.concurrent.locks.ReentrantReadWriteLock;
1616
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
1717
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
18+
import java.util.stream.Collectors;
1819

1920
/**
2021
* Feature flag storage.
@@ -28,7 +29,7 @@ public class FlagStore implements Storage {
2829
private final WriteLock writeLock = sync.writeLock();
2930

3031
private final AtomicBoolean shutdown = new AtomicBoolean(false);
31-
private final BlockingQueue<StorageState> stateBlockingQueue = new LinkedBlockingQueue<>(1);
32+
private final BlockingQueue<StorageStateDTO> stateBlockingQueue = new LinkedBlockingQueue<>(1);
3233
private final Map<String, FeatureFlag> flags = new HashMap<>();
3334

3435
private final Connector connector;
@@ -88,7 +89,7 @@ public FeatureFlag getFlag(final String key) {
8889
/**
8990
* Retrieve blocking queue to check storage status.
9091
*/
91-
public BlockingQueue<StorageState> getStateQueue() {
92+
public BlockingQueue<StorageStateDTO> getStateQueue() {
9293
return stateBlockingQueue;
9394
}
9495

@@ -100,27 +101,29 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
100101
switch (take.getType()) {
101102
case DATA:
102103
try {
104+
List<String> changedFlagsKeys;
103105
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), throwIfInvalid);
104106
writeLock.lock();
105107
try {
108+
changedFlagsKeys = getChangedFlagsKeys(flagMap);
106109
flags.clear();
107110
flags.putAll(flagMap);
108111
} finally {
109112
writeLock.unlock();
110113
}
111-
if (!stateBlockingQueue.offer(StorageState.OK)) {
114+
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.OK, changedFlagsKeys))) {
112115
log.warn("Failed to convey OK satus, queue is full");
113116
}
114117
} catch (Throwable e) {
115118
// catch all exceptions and avoid stream listener interruptions
116119
log.warn("Invalid flag sync payload from connector", e);
117-
if (!stateBlockingQueue.offer(StorageState.STALE)) {
120+
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.STALE))) {
118121
log.warn("Failed to convey STALE satus, queue is full");
119122
}
120123
}
121124
break;
122125
case ERROR:
123-
if (!stateBlockingQueue.offer(StorageState.ERROR)) {
126+
if (!stateBlockingQueue.offer(new StorageStateDTO(StorageState.ERROR))) {
124127
log.warn("Failed to convey ERROR satus, queue is full");
125128
}
126129
break;
@@ -132,4 +135,27 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
132135
log.info("Shutting down store stream listener");
133136
}
134137

138+
private List<String> getChangedFlagsKeys(Map<String, FeatureFlag> newFlags) {
139+
Map<String, FeatureFlag> changedFlags = new HashMap<>();
140+
Map<String, FeatureFlag> addedFeatureFlags = new HashMap<>();
141+
Map<String, FeatureFlag> removedFeatureFlags = new HashMap<>();
142+
Map<String, FeatureFlag> updatedFeatureFlags = new HashMap<>();
143+
newFlags.forEach((key, value) -> {
144+
if (!flags.containsKey(key)) {
145+
addedFeatureFlags.put(key, value);
146+
} else if (flags.containsKey(key) && !value.equals(flags.get(key))) {
147+
updatedFeatureFlags.put(key, value);
148+
}
149+
});
150+
flags.forEach((key,value) -> {
151+
if(!newFlags.containsKey(key)) {
152+
removedFeatureFlags.put(key, value);
153+
}
154+
});
155+
changedFlags.putAll(addedFeatureFlags);
156+
changedFlags.putAll(removedFeatureFlags);
157+
changedFlags.putAll(updatedFeatureFlags);
158+
return changedFlags.keySet().stream().collect(Collectors.toList());
159+
}
160+
135161
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ public interface Storage {
1414

1515
FeatureFlag getFlag(final String key);
1616

17-
BlockingQueue<StorageState> getStateQueue();
17+
BlockingQueue<StorageStateDTO> getStateQueue();
1818
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;
2+
3+
4+
import lombok.EqualsAndHashCode;
5+
import lombok.Getter;
6+
import lombok.ToString;
7+
8+
import java.util.Collections;
9+
import java.util.List;
10+
11+
@Getter
12+
@ToString
13+
@EqualsAndHashCode
14+
public class StorageStateDTO {
15+
private final StorageState storageState;
16+
private final List<String> changedFlagsKeys;
17+
18+
public StorageStateDTO(StorageState storageState, List<String> changedFlagsKeys) {
19+
this.storageState = storageState;
20+
this.changedFlagsKeys = changedFlagsKeys;
21+
}
22+
23+
public StorageStateDTO(StorageState storageState) {
24+
this.storageState = storageState;
25+
this.changedFlagsKeys = Collections.emptyList();
26+
}
27+
}

0 commit comments

Comments
 (0)