Skip to content

fix: transient error log-spam, add retry policy #1273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private void onError() {
errorTask = errorExecutor.schedule(
() -> {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
log.debug(
log.error(
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
gracePeriod);
flagResolver.onError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.NameResolverRegistry;
import io.grpc.Status.Code;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.epoll.Epoll;
Expand All @@ -15,12 +16,75 @@
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;

/** gRPC channel builder helper. */
public class ChannelBuilder {

/**
* Controls retry (not-reconnection) policy for failed RPCs.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
{
put("methodConfig", Arrays.asList(new HashMap() {
{
put(
"name",
Arrays.asList(
new HashMap() {
{
put("service", "flagd.sync.v1.FlagSyncService");
}
},
new HashMap() {
{
put("service", "flagd.evaluation.v1.Service");
}
}));
put("retryPolicy", new HashMap() {
{
// 1 + 2 + 4
put("maxAttempts", 3.0); // types used here are important, need to be doubles
put("initialBackoff", "1s");
put("maxBackoff", "5s");
put("backoffMultiplier", 2.0);
// status codes to retry on:
put(
"retryableStatusCodes",
Arrays.asList(
/*
* All codes are retryable except OK, CANCELLED and DEADLINE_EXCEEDED since
* any others not listed here cause a very tight loop of retries.
* CANCELLED is not retryable because it is a client-side termination.
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
* and definitionally should not result in a tight loop (it's a timeout).
*/
Code.UNKNOWN.toString(),
Code.INVALID_ARGUMENT.toString(),
Code.NOT_FOUND.toString(),
Code.ALREADY_EXISTS.toString(),
Code.PERMISSION_DENIED.toString(),
Code.RESOURCE_EXHAUSTED.toString(),
Code.FAILED_PRECONDITION.toString(),
Code.ABORTED.toString(),
Code.OUT_OF_RANGE.toString(),
Code.UNIMPLEMENTED.toString(),
Code.INTERNAL.toString(),
Code.UNAVAILABLE.toString(),
Code.DATA_LOSS.toString(),
Code.UNAUTHENTICATED.toString()));
}
});
}
}));
}
};

private ChannelBuilder() {}

/**
Expand All @@ -45,6 +109,8 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
.eventLoopGroup(new EpollEventLoopGroup())
.channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
.enableRetry()
.build();
}

Expand Down Expand Up @@ -89,7 +155,9 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
}

return builder.build();
return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
.enableRetry()
.build();
} catch (SSLException ssle) {
SslConfigException sslConfigException = new SslConfigException("Error with SSL configuration.");
sslConfigException.initCause(ssle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -24,11 +23,6 @@
@Slf4j
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {

/**
* The blocking service stub for making blocking GRPC calls.
*/
private final Function<ManagedChannel, K> blockingStubFunction;

/**
* The GRPC managed channel for managing the underlying GRPC connection.
*/
Expand All @@ -49,18 +43,13 @@ public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlock
* Constructs a new {@code ChannelConnector} instance with the specified options and parameters.
*
* @param options the configuration options for the GRPC connection
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
* @param onConnectionEvent a consumer to handle connection events
* @param channel the managed channel for the GRPC connection
*/
public ChannelConnector(
final FlagdOptions options,
final Function<ManagedChannel, K> blockingStub,
final Consumer<FlagdProviderEvent> onConnectionEvent,
ManagedChannel channel) {
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {

this.channel = channel;
this.blockingStubFunction = blockingStub;
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
}
Expand All @@ -69,14 +58,10 @@ public ChannelConnector(
* Constructs a {@code ChannelConnector} instance for testing purposes.
*
* @param options the configuration options for the GRPC connection
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
* @param onConnectionEvent a consumer to handle connection events
*/
public ChannelConnector(
final FlagdOptions options,
final Function<ManagedChannel, K> blockingStub,
final Consumer<FlagdProviderEvent> onConnectionEvent) {
this(options, blockingStub, onConnectionEvent, ChannelBuilder.nettyChannel(options));
public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent) {
this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
}

/**
Expand All @@ -89,19 +74,6 @@ public void initialize() throws Exception {
monitorChannelState(ConnectivityState.READY);
}

/**
* Returns the blocking service stub for making blocking GRPC calls.
*
* @return the blocking service stub
*/
public K getBlockingStub() {
K stub = blockingStubFunction.apply(channel).withWaitForReady();
if (this.deadline > 0) {
stub = stub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
}
return stub;
}

/**
* Shuts down the GRPC connection and cleans up associated resources.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
}
if (!stateBlockingQueue.offer(
new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
log.warn("Failed to convey OK satus, queue is full");
log.warn("Failed to convey OK status, queue is full");
}
} catch (Throwable e) {
// catch all exceptions and avoid stream listener interruptions
log.warn("Invalid flag sync payload from connector", e);
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
log.warn("Failed to convey STALE satus, queue is full");
log.warn("Failed to convey STALE status, queue is full");
}
}
break;
case ERROR:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
log.warn("Failed to convey ERROR satus, queue is full");
log.warn("Failed to convey ERROR status, queue is full");
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SyncStreamQueueSource implements QueueSource {

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final int streamDeadline;
private final int deadline;
private final String selector;
private final String providerId;
private final boolean syncMetadataDisabled;
Expand All @@ -45,30 +46,37 @@ public class SyncStreamQueueSource implements QueueSource {
new LinkedBlockingQueue<>(QUEUE_SIZE);
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub stub;
private final FlagSyncServiceBlockingStub blockingStub;

/**
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
*/
public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
streamDeadline = options.getStreamDeadlineMs();
deadline = options.getDeadline();
selector = options.getSelector();
providerId = options.getProviderId();
syncMetadataDisabled = options.isSyncMetadataDisabled();
channelConnector = new ChannelConnector<>(options, FlagSyncServiceGrpc::newBlockingStub, onConnectionEvent);
channelConnector = new ChannelConnector<>(options, onConnectionEvent);
this.stub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
this.blockingStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();
}

// internal use only
protected SyncStreamQueueSource(
final FlagdOptions options,
ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> connectorMock,
FlagSyncServiceStub stubMock) {
FlagSyncServiceStub stubMock,
FlagSyncServiceBlockingStub blockingStubMock) {
streamDeadline = options.getStreamDeadlineMs();
deadline = options.getDeadline();
selector = options.getSelector();
providerId = options.getProviderId();
channelConnector = connectorMock;
stub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
blockingStub = blockingStubMock;
}

/** Initialize sync stream connector. */
Expand Down Expand Up @@ -110,6 +118,7 @@ private void observeSyncStream() throws InterruptedException {
log.info("Initializing sync stream observer");

// outer loop for re-issuing the stream request
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
while (!shutdown.get()) {

log.debug("Initializing sync stream request");
Expand All @@ -124,15 +133,21 @@ private void observeSyncStream() throws InterruptedException {
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
if (!syncMetadataDisabled) {
try {
metadataResponse = channelConnector.getBlockingStub().getMetadata(metadataRequest.build());
FlagSyncServiceBlockingStub localStub = blockingStub;

if (deadline > 0) {
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
}

metadataResponse = localStub.getMetadata(metadataRequest.build());
} catch (Exception metaEx) {
log.error("Metadata exception: {}, cancelling stream", metaEx.getMessage(), metaEx);
context.cancel(metaEx);
}
}

// inner loop for handling messages
while (!shutdown.get()) {
while (!shutdown.get() && !Context.current().isCancelled()) {
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
if (taken.isComplete()) {
log.debug("Sync stream completed, will reconnect");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class RpcResolver implements Resolver {
private final LinkedBlockingQueue<StreamResponseModel<EventStreamResponse>> incomingQueue;
private final Consumer<FlagdProviderEvent> onProviderEvent;
private final ServiceStub stub;
private final ServiceBlockingStub blockingStub;

/**
* Resolves flag values using
Expand All @@ -82,9 +83,11 @@ public RpcResolver(
this.strategy = ResolveFactory.getStrategy(options);
this.options = options;
incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
this.connector = new ChannelConnector<>(options, ServiceGrpc::newBlockingStub, onProviderEvent);
this.connector = new ChannelConnector<>(options, onProviderEvent);
this.onProviderEvent = onProviderEvent;
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
this.blockingStub =
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
}

// testing only
Expand All @@ -93,6 +96,7 @@ protected RpcResolver(
final Cache cache,
final Consumer<FlagdProviderEvent> onProviderEvent,
ServiceStub mockStub,
ServiceBlockingStub mockBlockingStub,
ChannelConnector<ServiceStub, ServiceBlockingStub> connector) {
this.cache = cache;
this.strategy = ResolveFactory.getStrategy(options);
Expand All @@ -101,6 +105,7 @@ protected RpcResolver(
this.connector = connector;
this.onProviderEvent = onProviderEvent;
this.stub = mockStub;
this.blockingStub = mockBlockingStub;
}

/**
Expand Down Expand Up @@ -145,15 +150,15 @@ public void onError() {
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, getResolver()::resolveBoolean, null);
return resolve(key, ctx, request, getBlockingStub()::resolveBoolean, null);
}

/**
* String evaluation from grpc resolver.
*/
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
ResolveStringRequest request = ResolveStringRequest.newBuilder().buildPartial();
return resolve(key, ctx, request, getResolver()::resolveString, null);
return resolve(key, ctx, request, getBlockingStub()::resolveString, null);
}

/**
Expand All @@ -162,7 +167,7 @@ public ProviderEvaluation<String> stringEvaluation(String key, String defaultVal
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
ResolveFloatRequest request = ResolveFloatRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, getResolver()::resolveFloat, null);
return resolve(key, ctx, request, getBlockingStub()::resolveFloat, null);
}

/**
Expand All @@ -172,11 +177,17 @@ public ProviderEvaluation<Integer> integerEvaluation(String key, Integer default

ResolveIntRequest request = ResolveIntRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, getResolver()::resolveInt, (Object value) -> ((Long) value).intValue());
return resolve(key, ctx, request, getBlockingStub()::resolveInt, (Object value) -> ((Long) value).intValue());
}

private ServiceGrpc.ServiceBlockingStub getResolver() {
return connector.getBlockingStub().withDeadlineAfter(options.getDeadline(), TimeUnit.MILLISECONDS);
private ServiceGrpc.ServiceBlockingStub getBlockingStub() {
ServiceBlockingStub localStub = blockingStub;

if (options.getDeadline() > 0) {
localStub = localStub.withDeadlineAfter(options.getDeadline(), TimeUnit.MILLISECONDS);
}

return localStub;
}

/**
Expand All @@ -190,7 +201,7 @@ public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue
key,
ctx,
request,
getResolver()::resolveObject,
getBlockingStub()::resolveObject,
(Object value) -> convertObjectResponse((com.google.protobuf.Struct) value));
}

Expand Down Expand Up @@ -321,11 +332,11 @@ private void observeEventStream() throws InterruptedException {
log.info("Initializing event stream observer");

// outer loop for re-issuing the stream request
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
while (!shutdown.get()) {

log.debug("Initializing event stream request");
restartStream();

// inner loop for handling messages
while (!shutdown.get()) {
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();
Expand Down
Loading
Loading