Skip to content

feat: emit changed flags in configuration change event #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions providers/flagd/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is needed to avoid errors throw by findbugs when working with lombok.
lombok.addSuppressWarnings = true
lombok.addLombokGeneratedAnnotation = true
config.stopBubbling = true
lombok.extern.findbugs.addSuppressFBWarnings = true
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.FeatureProvider;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.ProviderState;
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -23,7 +23,7 @@
*/
@Slf4j
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider implements FeatureProvider {
public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagD Provider";

private final ReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -142,7 +142,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
return clientCallCtx;
}

private void setState(ProviderState newState) {
private void setState(ProviderState newState, List<String> changedFlagsKeys) {
ProviderState oldState;
Lock l = this.lock.writeLock();
try {
Expand All @@ -152,17 +152,17 @@ private void setState(ProviderState newState) {
} finally {
l.unlock();
}
this.handleStateTransition(oldState, newState);
this.handleStateTransition(oldState, newState, changedFlagsKeys);
}

private void handleStateTransition(ProviderState oldState, ProviderState newState) {
private void handleStateTransition(ProviderState oldState, ProviderState newState, List<String> changedFlagKeys) {
// we got initialized
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
// nothing to do, the SDK emits the events
log.debug("Init completed");
return;
}
// we got shutdown, not checking oldState as behavior remains the same for shutdown
// we got shutdown, not checking oldState as behavior remains the same for shutdown
if (ProviderState.NOT_READY.equals(newState)) {
// nothing to do
log.debug("shutdown completed");
Expand All @@ -171,7 +171,8 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
// configuration changed
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys)
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* EventStreamObserver handles events emitted by flagd.
*/
@Slf4j
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;
private final Object sync;
private final Cache cache;

Expand All @@ -28,11 +31,11 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
/**
* Create a gRPC stream that get notified about flag changes.
*
* @param sync synchronization object from caller
* @param cache cache to update
* @param stateConsumer lambda to call for setting the state
* @param sync synchronization object from caller
* @param cache cache to update
* @param stateConsumer lambda to call for setting the state
*/
EventStreamObserver(Object sync, Cache cache, Consumer<ProviderState> stateConsumer) {
EventStreamObserver(Object sync, Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
this.sync = sync;
this.cache = cache;
this.stateConsumer = stateConsumer;
Expand All @@ -58,7 +61,7 @@ public void onError(Throwable t) {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(ProviderState.ERROR);
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
Expand All @@ -69,32 +72,38 @@ public void onCompleted() {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(ProviderState.ERROR);
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
}

private void handleConfigurationChangeEvent(EventStreamResponse value) {
this.stateConsumer.accept(ProviderState.READY);
if (!this.cache.getEnabled()) {
return;
}
List<String> changedFlags = new ArrayList<>();
boolean cachingEnabled = this.cache.getEnabled();

Map<String, Value> data = value.getData().getFieldsMap();
Value flagsValue = data.get(FLAGS_KEY);
if (flagsValue == null) {
this.cache.clear();
return;
if (cachingEnabled) {
this.cache.clear();
}
} else {
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
this.cache.getEnabled();
for (String flagKey : flags.keySet()) {
changedFlags.add(flagKey);
if (cachingEnabled) {
this.cache.remove(flagKey);
}
}
}

Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
for (String flagKey : flags.keySet()) {
this.cache.remove(flagKey);
}
this.stateConsumer.accept(ProviderState.READY, changedFlags);
}

private void handleProviderReadyEvent() {
this.stateConsumer.accept(ProviderState.READY);
this.stateConsumer.accept(ProviderState.READY, Collections.emptyList());
if (this.cache.getEnabled()) {
this.cache.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import java.util.function.BiConsumer;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
Expand Down Expand Up @@ -37,7 +38,7 @@ public class GrpcConnector {
private final long deadline;

private final Cache cache;
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;

private int eventStreamAttempt = 1;
private int eventStreamRetryBackoff;
Expand All @@ -52,7 +53,8 @@ public class GrpcConnector {
* @param cache cache to use.
* @param stateConsumer lambda to call for setting the state.
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
public GrpcConnector(final FlagdOptions options, final Cache cache,
BiConsumer<ProviderState, List<String>> stateConsumer) {
this.channel = ChannelBuilder.nettyChannel(options);
this.serviceStub = ServiceGrpc.newStub(channel);
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
Expand Down Expand Up @@ -80,7 +82,8 @@ public void initialize() throws Exception {
/**
* Shuts down all gRPC resources.
*
* @throws Exception is something goes wrong while terminating the communication.
* @throws Exception is something goes wrong while terminating the
* communication.
*/
public void shutdown() throws Exception {
// first shutdown the event listener
Expand All @@ -100,7 +103,7 @@ public void shutdown() throws Exception {
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
}
this.stateConsumer.accept(ProviderState.NOT_READY);
this.stateConsumer.accept(ProviderState.NOT_READY, Collections.emptyList());
}
}

Expand All @@ -114,21 +117,24 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
}

/**
* Event stream observer logic. This contains blocking mechanisms, hence must be run in a dedicated thread.
* Event stream observer logic. This contains blocking mechanisms, hence must be
* run in a dedicated thread.
*/
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver =
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::grpcStateConsumer);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
synchronized (sync) {
sync.wait();
}
} catch (InterruptedException e) {
// Interruptions are considered end calls for this observer, hence log and return
// Note - this is the most common interruption when shutdown, hence the log level debug
// Interruptions are considered end calls for this observer, hence log and
// return
// Note - this is the most common interruption when shutdown, hence the log
// level debug
log.debug("interruption while waiting for condition", e);
Thread.currentThread().interrupt();
}
Expand All @@ -140,17 +146,18 @@ private void observeEventStream() {
try {
Thread.sleep(this.eventStreamRetryBackoff);
} catch (InterruptedException e) {
// Interruptions are considered end calls for this observer, hence log and return
// Interruptions are considered end calls for this observer, hence log and
// return
log.warn("interrupted while restarting gRPC Event Stream");
Thread.currentThread().interrupt();
}
}

log.error("failed to connect to event stream, exhausted retries");
this.grpcStateConsumer(ProviderState.ERROR);
this.grpcStateConsumer(ProviderState.ERROR, null);
}

private void grpcStateConsumer(final ProviderState state) {
private void grpcStateConsumer(final ProviderState state, final List<String> changedFlags) {
// check for readiness
if (ProviderState.READY.equals(state)) {
this.eventStreamAttempt = 1;
Expand All @@ -162,6 +169,6 @@ private void grpcStateConsumer(final ProviderState state) {
}

// chain to initiator
this.stateConsumer.accept(state);
this.stateConsumer.accept(state, changedFlags);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,7 +62,7 @@ public final class GrpcResolver implements Resolver {
* @param stateConsumer lambda to communicate back the state.
*/
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<ProviderState> stateSupplier,
final Consumer<ProviderState> stateConsumer) {
final BiConsumer<ProviderState,List<String>> stateConsumer) {
this.cache = cache;
this.stateSupplier = stateSupplier;

Expand Down
Loading