-
Notifications
You must be signed in to change notification settings - Fork 51
feat: gRPC stream connection deadline #999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
964fb9f
5d443b3
2adf69a
2e6ec90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ public class GrpcConnector { | |
|
||
private final int startEventStreamRetryBackoff; | ||
private final long deadline; | ||
private final long streamDeadlineMs; | ||
|
||
private final Cache cache; | ||
private final Consumer<ConnectionEvent> onConnectionEvent; | ||
|
@@ -64,6 +65,7 @@ public GrpcConnector(final FlagdOptions options, final Cache cache, final Suppli | |
this.startEventStreamRetryBackoff = options.getRetryBackoffMs(); | ||
this.eventStreamRetryBackoff = options.getRetryBackoffMs(); | ||
this.deadline = options.getDeadline(); | ||
this.streamDeadlineMs = options.getStreamDeadlineMs(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [suggestion] using the options object instead of separate fields would reduce this over head of adding a new field all the time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can go either way on this one. In fact it might be a good thing to do in a pure refactor/cleanup PR. There's also some naming that we can probably improve with the provider. |
||
this.cache = cache; | ||
this.onConnectionEvent = onConnectionEvent; | ||
this.connectedSupplier = connectedSupplier; | ||
|
@@ -126,7 +128,14 @@ private void observeEventStream() { | |
while (this.eventStreamAttempt <= this.maxEventStreamRetries) { | ||
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache, | ||
this::onConnectionEvent); | ||
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); | ||
|
||
ServiceGrpc.ServiceStub localServiceStub = this.serviceStub; | ||
|
||
if (this.streamDeadlineMs > 0) { | ||
localServiceStub = localServiceStub.withDeadlineAfter(this.streamDeadlineMs, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
localServiceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); | ||
|
||
try { | ||
synchronized (sync) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,8 @@ | |
import io.grpc.Context; | ||
import io.grpc.Context.CancellableContext; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.Status.Code; | ||
import io.grpc.StatusRuntimeException; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
|
@@ -43,6 +45,7 @@ public class GrpcStreamConnector implements Connector { | |
private final FlagSyncServiceStub serviceStub; | ||
private final FlagSyncServiceBlockingStub serviceBlockingStub; | ||
private final int deadline; | ||
private final int streamDeadlineMs; | ||
private final String selector; | ||
|
||
/** | ||
|
@@ -55,6 +58,7 @@ public GrpcStreamConnector(final FlagdOptions options) { | |
serviceStub = FlagSyncServiceGrpc.newStub(channel); | ||
serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel); | ||
deadline = options.getDeadline(); | ||
streamDeadlineMs = options.getStreamDeadlineMs(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [suggestion] Cant we just use the options within this class, and pass it further down the call chain? - adding a new field and parameters might be tedious over time |
||
selector = options.getSelector(); | ||
} | ||
|
||
|
@@ -64,7 +68,8 @@ public GrpcStreamConnector(final FlagdOptions options) { | |
public void init() { | ||
Thread listener = new Thread(() -> { | ||
try { | ||
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline); | ||
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline, | ||
streamDeadlineMs); | ||
} catch (InterruptedException e) { | ||
log.warn("gRPC event stream interrupted, flag configurations are stale", e); | ||
Thread.currentThread().interrupt(); | ||
|
@@ -114,7 +119,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo, | |
final FlagSyncServiceStub serviceStub, | ||
final FlagSyncServiceBlockingStub serviceBlockingStub, | ||
final String selector, | ||
final int deadline) | ||
final int deadline, | ||
final int streamDeadlineMs) | ||
throws InterruptedException { | ||
|
||
final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); | ||
|
@@ -128,14 +134,20 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo, | |
log.debug("Initializing sync stream request"); | ||
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder(); | ||
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder(); | ||
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance(); | ||
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance(); | ||
|
||
if (selector != null) { | ||
syncRequest.setSelector(selector); | ||
} | ||
|
||
try (CancellableContext context = Context.current().withCancellation()) { | ||
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); | ||
FlagSyncServiceStub localServiceStub = serviceStub; | ||
if (streamDeadlineMs > 0) { | ||
localServiceStub = localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); | ||
|
||
try { | ||
metadataResponse = serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS) | ||
.getMetadata(metadataRequest.build()); | ||
|
@@ -158,14 +170,21 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo, | |
} | ||
|
||
if (response.getError() != null || metadataException != null) { | ||
log.error(String.format("Error from initializing stream or metadata, retrying in %dms", | ||
retryDelay), response.getError()); | ||
|
||
if (!writeTo.offer( | ||
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata", | ||
metadataResponse))) { | ||
log.error("Failed to convey ERROR status, queue is full"); | ||
if (response.getError() instanceof StatusRuntimeException | ||
&& ((StatusRuntimeException) response.getError()).getStatus().getCode() | ||
.equals(Code.DEADLINE_EXCEEDED)) { | ||
log.debug(String.format("Stream deadline reached, re-establishing in %dms", | ||
retryDelay)); | ||
} else { | ||
log.error(String.format("Error initializing stream or metadata, retrying in %dms", | ||
retryDelay), response.getError()); | ||
if (!writeTo.offer( | ||
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata", | ||
metadataResponse))) { | ||
log.error("Failed to convey ERROR status, queue is full"); | ||
} | ||
} | ||
|
||
// close the context to cancel the stream in case just the metadata call failed | ||
context.cancel(metadataException); | ||
break; | ||
|
Uh oh!
There was an error while loading. Please reload this page.