-
Notifications
You must be signed in to change notification settings - Fork 1.2k
StreamableHttp - Server transport with state management #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concerns about related_request_id
- seems like it's accidentally added. Otherwise looks good
# If no session ID provided but required, return error | ||
if not request_session_id: | ||
response = self._create_error_response( | ||
"Bad Request: Missing session ID", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get this error when I run the example server:
uv run mcp-simple-streamablehttp --port 3000 --log-level DEBUG
Then
curl http://localhost:3000/mcp/ -d '
{
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {
"name": "curl-client",
"version": "1.0.0"
}
}
}' \
-H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \
-X POST | \
jq
output:
{
"jsonrpc": "2.0",
"id": "server-error",
"error": {
"code": -32600,
"message": "Bad Request: Missing session ID"
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this is before I'm able to get the session from the server to start with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - I was missing the id, thats why. it mistook the message as a notification, and then didn't trigger the if is_initialization_request
path. That could have a better error, but idt it should block here
|
||
app = Server("mcp-streamable-http-demo") | ||
|
||
@app.call_tool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit, also could be a follow up] Would be nice to use fastmcp here - i usually end up going:
server = FastMCP(...)
...
server._mcp_server # <- to access the lower level mcp interface
But perhaps we should just make it easier to get to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was quite confused here, we have two folders for servers: fastmcp and servers. I was planning to add fastmcp exaple as a follow up.
fastmcp might also help with related_request_id
clarity - it's hidden under log. Low level server does not have a place where we can nicely inject related_request_id
@jerome3o-anthropic, very good point on notification params, somehow I completely ignored that fact it's part of the spec 🙈 What do you think about using meta field to propagate |
I think I'd like to change the approach to use clean separation: #590 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving with the assumption we're gonna tidy up the related_request_id
/_meta
stuff.
from mcp.server.streamableHttp import ( | ||
MCP_SESSION_ID_HEADER, | ||
StreamableHTTPServerTransport, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this mcp.server.http.streamable
, or something similar. streamableHttp
is not PEP 8 compliant.
async def call_tool( | ||
name: str, arguments: dict | ||
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: | ||
ctx = app.request_context | ||
interval = arguments.get("interval", 1.0) | ||
count = arguments.get("count", 5) | ||
caller = arguments.get("caller", "unknown") | ||
|
||
# Send the specified number of notifications with the given interval | ||
for i in range(count): | ||
await ctx.session.send_log_message( | ||
level="info", | ||
data=f"Notification {i+1}/{count} from caller: {caller}", | ||
logger="notification_stream", | ||
# Associates this notification with the original request | ||
# Ensures notifications are sent to the correct response stream | ||
# Without this, notifications will either go to: | ||
# - a standalone SSE stream (if GET request is supported) | ||
# - nowhere (if GET request isn't supported) | ||
related_request_id=ctx.request_id, | ||
) | ||
if i < count - 1: # Don't wait after the last notification | ||
await anyio.sleep(interval) | ||
|
||
return [ | ||
types.TextContent( | ||
type="text", | ||
text=( | ||
f"Sent {count} notifications with {interval}s interval" | ||
f" for caller: {caller}" | ||
), | ||
) | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never check the name of the tool that is called. We should.
async def handle_streamable_http(scope, receive, send): | ||
request = Request(scope, receive) | ||
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER) | ||
if ( | ||
request_mcp_session_id is not None | ||
and request_mcp_session_id in server_instances | ||
): | ||
transport = server_instances[request_mcp_session_id] | ||
logger.debug("Session already exists, handling request directly") | ||
await transport.handle_request(scope, receive, send) | ||
elif request_mcp_session_id is None: | ||
# try to establish new session | ||
logger.debug("Creating new transport") | ||
# Use lock to prevent race conditions when creating new sessions | ||
async with session_creation_lock: | ||
new_session_id = uuid4().hex | ||
http_transport = StreamableHTTPServerTransport( | ||
mcp_session_id=new_session_id, | ||
is_json_response_enabled=json_response, | ||
) | ||
server_instances[http_transport.mcp_session_id] = http_transport | ||
async with http_transport.connect() as streams: | ||
read_stream, write_stream = streams | ||
|
||
async def run_server(): | ||
await app.run( | ||
read_stream, | ||
write_stream, | ||
app.create_initialization_options(), | ||
) | ||
|
||
if not task_group: | ||
raise RuntimeError("Task group is not initialized") | ||
|
||
task_group.start_soon(run_server) | ||
|
||
# Handle the HTTP request and return the response | ||
await http_transport.handle_request(scope, receive, send) | ||
else: | ||
response = Response( | ||
"Bad Request: No valid session ID provided", | ||
status_code=HTTPStatus.BAD_REQUEST, | ||
) | ||
await response(scope, receive, send) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels awfully complex as a lowlevel API, particularly if every MCP Server using the lowlevel API would have to go and use something like this. I wonder if we can provide a more high-level abstraction for this, that makes it more managable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, we can make it happen, will add as a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, let's rename this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dsp-ant I renamed the files later in the stack, it's quite a few of them by now (like 9 or 10), so was very difficult to rename in the very bottom. Happy to rename in the bottom if that's needed, just let me know.
accept_types = [media_type.strip() for media_type in accept_header.split(",")] | ||
|
||
has_json = any( | ||
media_type.startswith(CONTENT_TYPE_JSON) for media_type in accept_types | ||
) | ||
has_sse = any( | ||
media_type.startswith(CONTENT_TYPE_SSE) for media_type in accept_types | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given you have stripped accept_types as a list, you might even put them into a set and just do a check if CONTENT_TYPE_SSE is in media_type or CONTENT_TYPE_JSON is in media_type. It doesn't really matter here, but we don't need to traverse the list 3 times.
|
||
def _check_content_type(self, request: Request) -> bool: | ||
"""Check if the request has the correct Content-Type.""" | ||
content_type = request.headers.get("content-type", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are request.headers always lowercase?
content_type_parts = [ | ||
part.strip() for part in content_type.split(";")[0].split(",") | ||
] | ||
|
||
return any(part == CONTENT_TYPE_JSON for part in content_type_parts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
content_type_parts = [ | |
part.strip() for part in content_type.split(";")[0].split(",") | |
] | |
return any(part == CONTENT_TYPE_JSON for part in content_type_parts) | |
content_type_parts = ( | |
part.strip() for part in content_type.split(";")[0].split(",") | |
) | |
return any(part == CONTENT_TYPE_JSON for part in content_type_parts) |
We never use content_type_parts
other than checking it, we either can traverse it in a for-loop or should just use a generator here to be lazy about it.
async def _handle_post_request( | ||
self, scope: Scope, request: Request, receive: Receive, send: Send | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break this up? I think some of the branches, particularly the one where we handle an existing session and a non-existing session should probably be separate functions.
if request_session_id and request_session_id != self.mcp_session_id: | ||
response = self._create_error_response( | ||
"Not Found: Invalid or expired session ID", | ||
HTTPStatus.NOT_FOUND, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this is 404 or maybe rather a 400 BAD REQUEST? What does the TS SDK do here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll accept this for now under the assumption that the fixes are done on top.
Adding support for StreamableHTTP transport transport for MCP servers, providing bidirectional communication over
HTTP with streaming support.
Features
Follow ups
#443