-
Notifications
You must be signed in to change notification settings - Fork 1.2k
StreamableHttp - GET request standalone SSE #561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
2b95598
initial streamable http server
ihrpr 3d790f8
add request validation and tests
ihrpr 27bc01e
session management
ihrpr 3c4cf10
terminations of a session
ihrpr bce74b3
fix cleaning up
ihrpr 2011579
add happy path test
ihrpr 2cebf08
tests
ihrpr 6c9c320
json mode
ihrpr ede8cde
clean up
ihrpr 2a3bed8
fix example server
ihrpr 0456b1b
return 405 for get stream
ihrpr 97ca48d
speed up tests
ihrpr f738cbf
stateless implemetation
ihrpr 92d4287
format
ihrpr aa9f6e5
uv lock
ihrpr 2fba7f3
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr 45723ea
simplify readme
ihrpr 6b7a616
clean up
ihrpr b1be691
get sse
ihrpr 201ec99
uv lock
ihrpr 46ec72d
clean up
ihrpr 1902abb
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr da1df74
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr 9b096dc
add comments to server example where we use related_request_id
ihrpr bbe79c2
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr a0a9c5b
small fixes
ihrpr a5ac2e0
use mta field for related_request_id
ihrpr 2e615f3
unrelated test and format
ihrpr cae32e2
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr 58745c7
remove useless sleep
ihrpr 1387929
rename require_initialization to standalone_mode
ihrpr bccff75
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr 9a6da2e
ruff check
ihrpr ff70bd6
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr 179fbc8
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr a979864
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr 181bea6
Merge branch 'main' into ihrpr/get-sse
ihrpr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,9 @@ | |
CONTENT_TYPE_JSON = "application/json" | ||
CONTENT_TYPE_SSE = "text/event-stream" | ||
|
||
# Special key for the standalone GET stream | ||
GET_STREAM_KEY = "_GET_stream" | ||
|
||
# Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E) | ||
# Pattern ensures entire string contains only valid characters by using ^ and $ anchors | ||
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$") | ||
|
@@ -443,10 +446,19 @@ async def sse_writer(): | |
return | ||
|
||
async def _handle_get_request(self, request: Request, send: Send) -> None: | ||
"""Handle GET requests for SSE stream establishment.""" | ||
# Validate session ID if server has one | ||
if not await self._validate_session(request, send): | ||
return | ||
""" | ||
Handle GET request to establish SSE. | ||
|
||
This allows the server to communicate to the client without the client | ||
first sending data via HTTP POST. The server can send JSON-RPC requests | ||
and notifications on this stream. | ||
""" | ||
writer = self._read_stream_writer | ||
if writer is None: | ||
raise ValueError( | ||
"No read stream writer available. Ensure connect() is called first." | ||
) | ||
|
||
# Validate Accept header - must include text/event-stream | ||
_, has_sse = self._check_accept_headers(request) | ||
|
||
|
@@ -458,13 +470,80 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: | |
await response(request.scope, request.receive, send) | ||
return | ||
|
||
# TODO: Implement SSE stream for GET requests | ||
# For now, return 405 Method Not Allowed | ||
response = self._create_error_response( | ||
"SSE stream from GET request not implemented yet", | ||
HTTPStatus.METHOD_NOT_ALLOWED, | ||
if not await self._validate_session(request, send): | ||
return | ||
|
||
headers = { | ||
"Cache-Control": "no-cache, no-transform", | ||
"Connection": "keep-alive", | ||
"Content-Type": CONTENT_TYPE_SSE, | ||
} | ||
|
||
if self.mcp_session_id: | ||
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id | ||
|
||
# Check if we already have an active GET stream | ||
if GET_STREAM_KEY in self._request_streams: | ||
response = self._create_error_response( | ||
"Conflict: Only one SSE stream is allowed per session", | ||
HTTPStatus.CONFLICT, | ||
) | ||
await response(request.scope, request.receive, send) | ||
return | ||
Comment on lines
+486
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am okay with this, but I don't think this is necessarily true: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#multiple-connections. A client could have multiple SSE connections open but we should just reply to one. |
||
|
||
# Create SSE stream | ||
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[ | ||
dict[str, Any] | ||
](0) | ||
|
||
async def standalone_sse_writer(): | ||
try: | ||
# Create a standalone message stream for server-initiated messages | ||
standalone_stream_writer, standalone_stream_reader = ( | ||
anyio.create_memory_object_stream[JSONRPCMessage](0) | ||
) | ||
|
||
# Register this stream using the special key | ||
self._request_streams[GET_STREAM_KEY] = standalone_stream_writer | ||
|
||
async with sse_stream_writer, standalone_stream_reader: | ||
# Process messages from the standalone stream | ||
async for received_message in standalone_stream_reader: | ||
# For the standalone stream, we handle: | ||
# - JSONRPCNotification (server sends notifications to client) | ||
# - JSONRPCRequest (server sends requests to client) | ||
# We should NOT receive JSONRPCResponse | ||
|
||
# Send the message via SSE | ||
event_data = { | ||
"event": "message", | ||
"data": received_message.model_dump_json( | ||
by_alias=True, exclude_none=True | ||
), | ||
} | ||
|
||
await sse_stream_writer.send(event_data) | ||
except Exception as e: | ||
logger.exception(f"Error in standalone SSE writer: {e}") | ||
finally: | ||
logger.debug("Closing standalone SSE writer") | ||
# Remove the stream from request_streams | ||
self._request_streams.pop(GET_STREAM_KEY, None) | ||
|
||
# Create and start EventSourceResponse | ||
response = EventSourceResponse( | ||
content=sse_stream_reader, | ||
data_sender_callable=standalone_sse_writer, | ||
headers=headers, | ||
) | ||
await response(request.scope, request.receive, send) | ||
|
||
try: | ||
# This will send headers immediately and establish the SSE connection | ||
await response(request.scope, request.receive, send) | ||
except Exception as e: | ||
logger.exception(f"Error in standalone SSE response: {e}") | ||
# Clean up the request stream | ||
self._request_streams.pop(GET_STREAM_KEY, None) | ||
|
||
async def _handle_delete_request(self, request: Request, send: Send) -> None: | ||
"""Handle DELETE requests for explicit session termination.""" | ||
|
@@ -611,21 +690,18 @@ async def message_router(): | |
else: | ||
target_request_id = str(message.root.id) | ||
|
||
# Send to the specific request stream if available | ||
if ( | ||
target_request_id | ||
and target_request_id in self._request_streams | ||
): | ||
request_stream_id = target_request_id or GET_STREAM_KEY | ||
if request_stream_id in self._request_streams: | ||
try: | ||
await self._request_streams[target_request_id].send( | ||
await self._request_streams[request_stream_id].send( | ||
message | ||
) | ||
except ( | ||
anyio.BrokenResourceError, | ||
anyio.ClosedResourceError, | ||
): | ||
# Stream might be closed, remove from registry | ||
self._request_streams.pop(target_request_id, None) | ||
self._request_streams.pop(request_stream_id, None) | ||
except Exception as e: | ||
logger.exception(f"Error in message router: {e}") | ||
|
||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually - a test for this would be awesome
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have a test for this in the next PR, where I have a client!