Skip to content

Commit c886adb

Browse files
committed
feat: quick return tool-call request, send response via SSE in goroutine
1 parent 8cdb6c6 commit c886adb

File tree

1 file changed

+34
-38
lines changed

1 file changed

+34
-38
lines changed

server/sse.go

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ var _ ClientSession = (*sseSession)(nil)
5353
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
5454
// It provides real-time communication capabilities over HTTP using the SSE protocol.
5555
type SSEServer struct {
56-
server *MCPServer
57-
baseURL string
58-
basePath string
59-
useFullURLForMessageEndpoint bool
60-
messageEndpoint string
61-
sseEndpoint string
62-
sessions sync.Map
63-
srv *http.Server
64-
contextFunc SSEContextFunc
56+
server *MCPServer
57+
baseURL string
58+
basePath string
59+
useFullURLForMessageEndpoint bool
60+
messageEndpoint string
61+
sseEndpoint string
62+
sessions sync.Map
63+
srv *http.Server
64+
contextFunc SSEContextFunc
6565

6666
keepAlive bool
6767
keepAliveInterval time.Duration
@@ -158,12 +158,12 @@ func WithSSEContextFunc(fn SSEContextFunc) SSEOption {
158158
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
159159
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
160160
s := &SSEServer{
161-
server: server,
162-
sseEndpoint: "/sse",
163-
messageEndpoint: "/message",
164-
useFullURLForMessageEndpoint: true,
165-
keepAlive: false,
166-
keepAliveInterval: 10 * time.Second,
161+
server: server,
162+
sseEndpoint: "/sse",
163+
messageEndpoint: "/message",
164+
useFullURLForMessageEndpoint: true,
165+
keepAlive: false,
166+
keepAliveInterval: 10 * time.Second,
167167
}
168168

169169
// Apply all options
@@ -293,7 +293,6 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
293293
}()
294294
}
295295

296-
297296
// Send the initial endpoint event
298297
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID))
299298
flusher.Flush()
@@ -356,31 +355,28 @@ func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
356355
return
357356
}
358357

359-
// Process message through MCPServer
360-
response := s.server.HandleMessage(ctx, rawMessage)
358+
// quick return request, send 202 Accepted with no body, then deal the message and sent response via SSE
359+
w.WriteHeader(http.StatusAccepted)
361360

362-
// Only send response if there is one (not for notifications)
363-
if response != nil {
364-
eventData, _ := json.Marshal(response)
361+
go func() {
362+
// Process message through MCPServer
363+
response := s.server.HandleMessage(ctx, rawMessage)
365364

366-
// Queue the event for sending via SSE
367-
select {
368-
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
369-
// Event queued successfully
370-
case <-session.done:
371-
// Session is closed, don't try to queue
372-
default:
373-
// Queue is full, could log this
374-
}
365+
// Only send response if there is one (not for notifications)
366+
if response != nil {
367+
eventData, _ := json.Marshal(response)
375368

376-
// Send HTTP response
377-
w.Header().Set("Content-Type", "application/json")
378-
w.WriteHeader(http.StatusAccepted)
379-
json.NewEncoder(w).Encode(response)
380-
} else {
381-
// For notifications, just send 202 Accepted with no body
382-
w.WriteHeader(http.StatusAccepted)
383-
}
369+
// Queue the event for sending via SSE
370+
select {
371+
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
372+
// Event queued successfully
373+
case <-session.done:
374+
// Session is closed, don't try to queue
375+
default:
376+
// Queue is full, could log this
377+
}
378+
}
379+
}()
384380
}
385381

386382
// writeJSONRPCError writes a JSON-RPC error response with the given error details.

0 commit comments

Comments
 (0)