Skip to content

Commit fde9e39

Browse files
authored
chore: log tweaks, retry cancels, add options.toBuidler (#1276)
Signed-off-by: Todd Baert <[email protected]>
1 parent 2d87b9c commit fde9e39

File tree

7 files changed

+14
-10
lines changed

7 files changed

+14
-10
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
/**
2020
* FlagdOptions is a builder to build flagd provider options.
2121
*/
22-
@Builder
22+
@Builder(toBuilder = true)
2323
@Getter
2424
@SuppressWarnings("PMD.TooManyStaticImports")
2525
public class FlagdOptions {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void waitForInitialization(long deadline) {
6161
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
6262
if (now >= end) {
6363
throw new GeneralError(String.format(
64-
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
64+
"Initialization timeout exceeded; did not complete within the %d ms deadline.", deadline));
6565
}
6666
long remaining = end - now;
6767
synchronized (this) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ public class ChannelBuilder {
5858
"retryableStatusCodes",
5959
Arrays.asList(
6060
/*
61-
* All codes are retryable except OK, CANCELLED and DEADLINE_EXCEEDED since
61+
* All codes are retryable except OK and DEADLINE_EXCEEDED since
6262
* any others not listed here cause a very tight loop of retries.
63-
* CANCELLED is not retryable because it is a client-side termination.
6463
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
6564
* and definitionally should not result in a tight loop (it's a timeout).
6665
*/
66+
Code.CANCELLED.toString(),
6767
Code.UNKNOWN.toString(),
6868
Code.INVALID_ARGUMENT.toString(),
6969
Code.NOT_FOUND.toString(),

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProvider
7070
* @throws Exception if the channel does not reach the desired state within the deadline
7171
*/
7272
public void initialize() throws Exception {
73-
log.info("Initializing GRPC connection...");
73+
log.info("Initializing GRPC connection.");
7474
monitorChannelState(ConnectivityState.READY);
7575
}
7676

@@ -80,7 +80,7 @@ public void initialize() throws Exception {
8080
* @throws InterruptedException if interrupted while waiting for termination
8181
*/
8282
public void shutdown() throws InterruptedException {
83-
log.info("Shutting down GRPC connection...");
83+
log.info("Shutting down GRPC connection.");
8484

8585
if (!channel.isShutdown()) {
8686
channel.shutdownNow();

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/QueueingStreamObserver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class QueueingStreamObserver<T> implements StreamObserver<T> {
1717

1818
public QueueingStreamObserver(final BlockingQueue<StreamResponseModel<T>> queue) {
1919
blockingQueue = queue;
20+
queue.clear();
2021
}
2122

2223
@Override

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void observeSyncStream() throws InterruptedException {
128128
// create a context which exists to track and cancel the stream
129129
try (CancellableContext context = Context.current().withCancellation()) {
130130

131-
restart(); // start the stream within the context
131+
restart(); // start the stream with the context
132132

133133
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
134134
if (!syncMetadataDisabled) {
@@ -150,16 +150,18 @@ private void observeSyncStream() throws InterruptedException {
150150
while (!shutdown.get() && !Context.current().isCancelled()) {
151151
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
152152
if (taken.isComplete()) {
153-
log.debug("Sync stream completed, will reconnect");
153+
log.debug("Sync stream completed, will restart");
154154
// The stream is complete, we still try to reconnect
155155
break;
156156
}
157157

158158
Throwable streamException = taken.getError();
159159
if (streamException != null) {
160-
log.debug("Exception in GRPC connection, streamException {}, will reconnect", streamException);
160+
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
161161
if (!outgoingQueue.offer(new QueuePayload(
162-
QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) {
162+
QueuePayloadType.ERROR,
163+
String.format("Error from stream: %s", streamException.getMessage()),
164+
metadataResponse))) {
163165
log.error("Failed to convey ERROR status, queue is full");
164166
}
165167
break;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class SyncStreamQueueSourceTest {
4040
@BeforeEach
4141
public void init() throws Exception {
4242
blockingStub = mock(FlagSyncServiceBlockingStub.class);
43+
when(blockingStub.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStub);
4344
when(blockingStub.getMetadata(any())).thenReturn(GetMetadataResponse.getDefaultInstance());
4445

4546
mockConnector = mock(ChannelConnector.class);

0 commit comments

Comments
 (0)