Skip to content

feat: Improve wait logic to a more elegant solution #1160 #1169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4402e66
feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 20, 2025
99dda5c
Merge branch 'refs/heads/main' into improve-wait-logic-to-a-more-eleg…
chrfwow Jan 21, 2025
a41e980
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 21, 2025
12e889f
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 21, 2025
82b542b
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 21, 2025
4208b9e
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 21, 2025
15af42d
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 22, 2025
03de275
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 22, 2025
fa75f25
fixup! feat: improve wait logic to a more elegant solution #1160
chrfwow Jan 22, 2025
bee7edd
Merge branch 'main' into improve-wait-logic-to-a-more-elegant-solutio…
chrfwow Feb 17, 2025
7d2022d
merge with master branch
chrfwow Feb 17, 2025
97bdaba
revert cucumber changes
chrfwow Feb 18, 2025
9a32b22
increase sleep to "fix" flaky tests
chrfwow Feb 18, 2025
29a4260
increase sleep to "fix" flaky tests
chrfwow Feb 18, 2025
89f4d1e
increase sleep to "fix" flaky tests
chrfwow Feb 18, 2025
619a67f
reintroduce names for providers
chrfwow Feb 18, 2025
2c9c843
Merge branch 'main' into improve-wait-logic-to-a-more-elegant-solutio…
chrfwow Feb 19, 2025
c2024e7
merge with main
chrfwow Feb 19, 2025
c67fa32
Merge branch 'main' into improve-wait-logic-to-a-more-elegant-solutio…
toddbaert Feb 26, 2025
54de67c
improve naming of constant
chrfwow Feb 26, 2025
b52c55c
Merge remote-tracking branch 'origin/improve-wait-logic-to-a-more-ele…
chrfwow Feb 26, 2025
c6e2e6b
Merge branch 'main' into improve-wait-logic-to-a-more-elegant-solutio…
toddbaert Feb 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@
<!-- this can be overriden in child POMs to support specific SDK requirements -->
<groupId>dev.openfeature</groupId>
<artifactId>sdk</artifactId>
<!-- 1.14 <= v < 2.0 (excluding 2.0 pre-releases)-->
<version>[1.14,1.99999)</version>
<!-- 1.14.1 <= v < 2.0 (excluding 2.0 pre-releases)-->
<version>[1.14.1,1.99999)</version>
<!-- use the version provided at runtime -->
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
Expand All @@ -36,7 +34,7 @@ public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private final List<Hook> hooks = new ArrayList<>();
private final EventsLock eventsLock = new EventsLock();
private final FlagdProviderSyncResources syncResources = new FlagdProviderSyncResources();

/**
* An executor service responsible for emitting
Expand Down Expand Up @@ -108,7 +106,9 @@ public FlagdProvider(final FlagdOptions options) {
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.eventsLock.initialized = initialized;
if (initialized) {
this.syncResources.initialize();
}
}

@Override
Expand All @@ -118,28 +118,27 @@ public List<Hook> getProviderHooks() {

@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (eventsLock) {
if (eventsLock.initialized) {
synchronized (syncResources) {
if (syncResources.isInitialized()) {
return;
}

flagResolver.init();
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
syncResources.waitForInitialization(this.deadline * 2);
}
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
// follow up
// wait outside of the synchonrization or we'll deadlock
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
}

@Override
public void shutdown() {
synchronized (eventsLock) {
if (!eventsLock.initialized) {
return;
}
synchronized (syncResources) {
try {
if (!syncResources.isInitialized() || syncResources.isShutDown()) {
return;
}

this.flagResolver.shutdown();
if (errorExecutor != null) {
errorExecutor.shutdownNow();
Expand All @@ -148,7 +147,7 @@ public void shutdown() {
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
eventsLock.initialized = false;
syncResources.shutdown();
}
}
}
Expand Down Expand Up @@ -189,15 +188,13 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* @return context
*/
EvaluationContext getEnrichedContext() {
return eventsLock.enrichedContext;
return syncResources.getEnrichedContext();
}

@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

synchronized (eventsLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent.getEvent());

log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
synchronized (syncResources) {
/*
* We only use Error and Ready as previous states.
* As error will first be emitted as Stale, and only turns after a while into an
Expand All @@ -209,29 +206,30 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
// intentional fall through
case PROVIDER_READY:
/*
* Sync metadata is used to enrich the context, and is immutable in flagd,
* so we only need it to be fetched once at READY.
*/
if (flagdProviderEvent.getSyncMetadata() != null) {
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
}
onReady();
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
}
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;
break;

default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
Expand All @@ -246,8 +244,7 @@ private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
}

private void onReady() {
if (!eventsLock.initialized) {
eventsLock.initialized = true;
if (syncResources.initialize()) {
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
Expand All @@ -272,7 +269,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
Expand All @@ -286,14 +283,4 @@ private void onError() {
TimeUnit.SECONDS);
}
}

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
static class EventsLock {
volatile ProviderEvent previousEvent = null;
volatile boolean initialized = false;
volatile EvaluationContext enrichedContext = new ImmutableContext();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableContext;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.exceptions.GeneralError;
import lombok.Getter;
import lombok.Setter;

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks in the {@link FlagdProvider}.
*/
@Getter
class FlagdProviderSyncResources {
@Setter
private volatile ProviderEvent previousEvent = null;

private volatile EvaluationContext enrichedContext = new ImmutableContext();
private volatile boolean initialized;
private volatile boolean isShutDown;

public void setEnrichedContext(EvaluationContext context) {
this.enrichedContext = new ImmutableContext(context.asMap());
}

/**
* With this method called, it is suggested that initialization has been completed. It will wake up all threads that
* wait for the initialization. Subsequent calls have no effect.
*
* @return true iff this was the first call to {@code initialize()}
*/
public synchronized boolean initialize() {
if (this.initialized) {
return false;
}
this.initialized = true;
this.notifyAll();
return true;
}

/**
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
* be thrown. Otherwise, the method will return cleanly.
*
* @param deadline the maximum time in ms to wait
* @throws GeneralError when the deadline is exceeded before
* {@link FlagdProviderSyncResources#initialize()} is called on this object
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
* this object
*/
public void waitForInitialization(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!initialized && !isShutDown) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
}
long remaining = end - now;
synchronized (this) {
if (isShutDown) {
break;
}
if (initialized) { // might have changed in the meantime
return;
}
try {
this.wait(remaining);
} catch (InterruptedException e) {
// try again. Leave the continue to make PMD happy
continue;
}
}
}
if (isShutDown) {
throw new IllegalStateException("Already shut down");
}
}

/**
* Signals a shutdown. Threads waiting for initialization will wake up and throw an {@link IllegalStateException}.
*/
public synchronized void shutdown() {
isShutDown = true;
this.notifyAll();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<FlagdProviderEvent> onConnectionEvent;
private final Operator operator;
private final long deadline;
private final String scope;

/**
Expand All @@ -52,7 +51,6 @@ public class InProcessResolver implements Resolver {
*/
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
this.operator = new Operator();
this.scope = options.getSelector();
Expand All @@ -70,10 +68,12 @@ public void init() throws Exception {
flagStore.getStateQueue().take();
switch (storageStateChange.getStorageState()) {
case OK:
log.info("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
onConnectionEvent.accept(new FlagdProviderEvent(
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
storageStateChange.getChangedFlagsKeys(),
storageStateChange.getSyncMetadata()));
log.info("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
break;
case ERROR:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
Expand Down
Loading
Loading