Skip to content

Commit 61eab37

Browse files
authored
server: block GracefulStop on method handlers and make blocking optional for Stop (#6922)
1 parent ddd377f commit 61eab37

File tree

3 files changed

+177
-3
lines changed

3 files changed

+177
-3
lines changed

orca/producer_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,14 @@ func (f *fakeORCAService) close() {
228228

229229
func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
230230
f.reqCh <- req
231-
for resp := range f.respCh {
231+
for {
232+
var resp any
233+
select {
234+
case resp = <-f.respCh:
235+
case <-stream.Context().Done():
236+
return stream.Context().Err()
237+
}
238+
232239
if err, ok := resp.(error); ok {
233240
return err
234241
}
@@ -245,7 +252,6 @@ func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportR
245252
return err
246253
}
247254
}
248-
return nil
249255
}
250256

251257
// TestProducerBackoff verifies that the ORCA producer applies the proper

server.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ type Server struct {
136136
quit *grpcsync.Event
137137
done *grpcsync.Event
138138
channelzRemoveOnce sync.Once
139-
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
139+
serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
140+
handlersWG sync.WaitGroup // counts active method handler goroutines
140141

141142
channelzID *channelz.Identifier
142143
czData *channelzData
@@ -173,6 +174,7 @@ type serverOptions struct {
173174
headerTableSize *uint32
174175
numServerWorkers uint32
175176
recvBufferPool SharedBufferPool
177+
waitForHandlers bool
176178
}
177179

178180
var defaultServerOptions = serverOptions{
@@ -570,6 +572,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
570572
})
571573
}
572574

575+
// WaitForHandlers cause Stop to wait until all outstanding method handlers have
576+
// exited before returning. If false, Stop will return as soon as all
577+
// connections have closed, but method handlers may still be running. By
578+
// default, Stop does not wait for method handlers to return.
579+
//
580+
// # Experimental
581+
//
582+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
583+
// later release.
584+
func WaitForHandlers(w bool) ServerOption {
585+
return newFuncServerOption(func(o *serverOptions) {
586+
o.waitForHandlers = w
587+
})
588+
}
589+
573590
// RecvBufferPool returns a ServerOption that configures the server
574591
// to use the provided shared buffer pool for parsing incoming messages. Depending
575592
// on the application's workload, this could result in reduced memory allocation.
@@ -1004,9 +1021,11 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,
10041021

10051022
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
10061023
st.HandleStreams(ctx, func(stream *transport.Stream) {
1024+
s.handlersWG.Add(1)
10071025
streamQuota.acquire()
10081026
f := func() {
10091027
defer streamQuota.release()
1028+
defer s.handlersWG.Done()
10101029
s.handleStream(st, stream)
10111030
}
10121031

@@ -1905,6 +1924,10 @@ func (s *Server) stop(graceful bool) {
19051924
s.serverWorkerChannelClose()
19061925
}
19071926

1927+
if graceful || s.opts.waitForHandlers {
1928+
s.handlersWG.Wait()
1929+
}
1930+
19081931
if s.events != nil {
19091932
s.events.Finish()
19101933
s.events = nil

server_ext_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,148 @@ func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
185185

186186
ss.S.GracefulStop()
187187
}
188+
189+
// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
190+
// is called, and ensures Stop doesn't return until the handler returns.
191+
func (s) TestServer_WaitForHandlers(t *testing.T) {
192+
started := grpcsync.NewEvent()
193+
blockCalls := grpcsync.NewEvent()
194+
195+
// This stub server does not properly respect the stream context, so it will
196+
// not exit when the context is canceled.
197+
ss := stubserver.StubServer{
198+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
199+
started.Fire()
200+
<-blockCalls.Done()
201+
return nil
202+
},
203+
}
204+
if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
205+
t.Fatal("Error starting server:", err)
206+
}
207+
defer ss.Stop()
208+
209+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
210+
defer cancel()
211+
212+
// Start one RPC to the server.
213+
ctx1, cancel1 := context.WithCancel(ctx)
214+
_, err := ss.Client.FullDuplexCall(ctx1)
215+
if err != nil {
216+
t.Fatal("Error staring call:", err)
217+
}
218+
219+
// Wait for the handler to be invoked.
220+
select {
221+
case <-started.Done():
222+
case <-ctx.Done():
223+
t.Fatalf("Timed out waiting for RPC to start on server.")
224+
}
225+
226+
// Cancel it on the client. The server handler will still be running.
227+
cancel1()
228+
229+
// Close the connection. This might be sufficient to allow the server to
230+
// return if it doesn't properly wait for outstanding method handlers to
231+
// return.
232+
ss.CC.Close()
233+
234+
// Try to Stop() the server, which should block indefinitely (until
235+
// blockCalls is fired).
236+
stopped := grpcsync.NewEvent()
237+
go func() {
238+
ss.S.Stop()
239+
stopped.Fire()
240+
}()
241+
242+
// Wait 100ms and ensure stopped does not fire.
243+
select {
244+
case <-stopped.Done():
245+
trace := make([]byte, 4096)
246+
trace = trace[0:runtime.Stack(trace, true)]
247+
blockCalls.Fire()
248+
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
249+
case <-time.After(100 * time.Millisecond):
250+
// Success; unblock the call and wait for stopped.
251+
blockCalls.Fire()
252+
}
253+
254+
select {
255+
case <-stopped.Done():
256+
case <-ctx.Done():
257+
t.Fatalf("Timed out waiting for second RPC to start on server.")
258+
}
259+
}
260+
261+
// Tests that GracefulStop will wait for all method handlers to return by
262+
// blocking a handler and ensuring GracefulStop doesn't return until after it is
263+
// unblocked.
264+
func (s) TestServer_GracefulStopWaits(t *testing.T) {
265+
started := grpcsync.NewEvent()
266+
blockCalls := grpcsync.NewEvent()
267+
268+
// This stub server does not properly respect the stream context, so it will
269+
// not exit when the context is canceled.
270+
ss := stubserver.StubServer{
271+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
272+
started.Fire()
273+
<-blockCalls.Done()
274+
return nil
275+
},
276+
}
277+
if err := ss.Start(nil); err != nil {
278+
t.Fatal("Error starting server:", err)
279+
}
280+
defer ss.Stop()
281+
282+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
283+
defer cancel()
284+
285+
// Start one RPC to the server.
286+
ctx1, cancel1 := context.WithCancel(ctx)
287+
_, err := ss.Client.FullDuplexCall(ctx1)
288+
if err != nil {
289+
t.Fatal("Error staring call:", err)
290+
}
291+
292+
// Wait for the handler to be invoked.
293+
select {
294+
case <-started.Done():
295+
case <-ctx.Done():
296+
t.Fatalf("Timed out waiting for RPC to start on server.")
297+
}
298+
299+
// Cancel it on the client. The server handler will still be running.
300+
cancel1()
301+
302+
// Close the connection. This might be sufficient to allow the server to
303+
// return if it doesn't properly wait for outstanding method handlers to
304+
// return.
305+
ss.CC.Close()
306+
307+
// Try to Stop() the server, which should block indefinitely (until
308+
// blockCalls is fired).
309+
stopped := grpcsync.NewEvent()
310+
go func() {
311+
ss.S.GracefulStop()
312+
stopped.Fire()
313+
}()
314+
315+
// Wait 100ms and ensure stopped does not fire.
316+
select {
317+
case <-stopped.Done():
318+
trace := make([]byte, 4096)
319+
trace = trace[0:runtime.Stack(trace, true)]
320+
blockCalls.Fire()
321+
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
322+
case <-time.After(100 * time.Millisecond):
323+
// Success; unblock the call and wait for stopped.
324+
blockCalls.Fire()
325+
}
326+
327+
select {
328+
case <-stopped.Done():
329+
case <-ctx.Done():
330+
t.Fatalf("Timed out waiting for second RPC to start on server.")
331+
}
332+
}

0 commit comments

Comments
 (0)