Skip to content

StreamableHttp - Server transport with state management #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 2, 2025

Conversation

ihrpr
Copy link
Contributor

@ihrpr ihrpr commented Apr 21, 2025

Adding support for StreamableHTTP transport transport for MCP servers, providing bidirectional communication over
HTTP with streaming support.

Features

  • Session management
  • Implementation for server termination (DELETE)
  • Includes support for JSON and SSE response modes
  • Handles Related request IDs for notifications like logging and progress
  • Example of a server with tool notifications

Follow ups

  • Suport stateless mode
  • Support GET SSE
  • Streamable Http Client

#443

Copy link
Contributor

@jerome3o-anthropic jerome3o-anthropic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerns about related_request_id - seems like it's accidentally added. Otherwise looks good

# If no session ID provided but required, return error
if not request_session_id:
response = self._create_error_response(
"Bad Request: Missing session ID",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this error when I run the example server:

uv run mcp-simple-streamablehttp --port 3000 --log-level DEBUG

Then

curl http://localhost:3000/mcp/ -d '
{
    "jsonrpc": "2.0",
    "method": "initialize",
    "params": {
        "protocolVersion": "2025-03-26",
        "capabilities": {},
        "clientInfo": {
            "name": "curl-client",
            "version": "1.0.0"
        }
    }
}' \
    -H "Content-Type: application/json" \
    -H "Accept: application/json, text/event-stream" \
    -X POST | \
    jq

output:

{
  "jsonrpc": "2.0",
  "id": "server-error",
  "error": {
    "code": -32600,
    "message": "Bad Request: Missing session ID"
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is before I'm able to get the session from the server to start with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - I was missing the id, thats why. it mistook the message as a notification, and then didn't trigger the if is_initialization_request path. That could have a better error, but idt it should block here


app = Server("mcp-streamable-http-demo")

@app.call_tool()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit, also could be a follow up] Would be nice to use fastmcp here - i usually end up going:

server = FastMCP(...)
...
server._mcp_server # <- to access the lower level mcp interface

But perhaps we should just make it easier to get to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was quite confused here, we have two folders for servers: fastmcp and servers. I was planning to add fastmcp exaple as a follow up.

fastmcp might also help with related_request_id clarity - it's hidden under log. Low level server does not have a place where we can nicely inject related_request_id

@ihrpr
Copy link
Contributor Author

ihrpr commented Apr 23, 2025

@jerome3o-anthropic, very good point on notification params, somehow I completely ignored that fact it's part of the spec 🙈

What do you think about using meta field to propagate related_request_id?

@ihrpr
Copy link
Contributor Author

ihrpr commented Apr 27, 2025

@jerome3o-anthropic, very good point on notification params, somehow I completely ignored that fact it's part of the spec 🙈

What do you think about using meta field to propagate related_request_id?

I think I'd like to change the approach to use clean separation: #590

Copy link
Contributor

@jerome3o-anthropic jerome3o-anthropic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving with the assumption we're gonna tidy up the related_request_id/_meta stuff.

Comment on lines +10 to +13
from mcp.server.streamableHttp import (
MCP_SESSION_ID_HEADER,
StreamableHTTPServerTransport,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this mcp.server.http.streamable, or something similar. streamableHttp is not PEP 8 compliant.

Comment on lines +71 to +103
async def call_tool(
name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
ctx = app.request_context
interval = arguments.get("interval", 1.0)
count = arguments.get("count", 5)
caller = arguments.get("caller", "unknown")

# Send the specified number of notifications with the given interval
for i in range(count):
await ctx.session.send_log_message(
level="info",
data=f"Notification {i+1}/{count} from caller: {caller}",
logger="notification_stream",
# Associates this notification with the original request
# Ensures notifications are sent to the correct response stream
# Without this, notifications will either go to:
# - a standalone SSE stream (if GET request is supported)
# - nowhere (if GET request isn't supported)
related_request_id=ctx.request_id,
)
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)

return [
types.TextContent(
type="text",
text=(
f"Sent {count} notifications with {interval}s interval"
f" for caller: {caller}"
),
)
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never check the name of the tool that is called. We should.

Comment on lines +143 to +186
async def handle_streamable_http(scope, receive, send):
request = Request(scope, receive)
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER)
if (
request_mcp_session_id is not None
and request_mcp_session_id in server_instances
):
transport = server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
await transport.handle_request(scope, receive, send)
elif request_mcp_session_id is None:
# try to establish new session
logger.debug("Creating new transport")
# Use lock to prevent race conditions when creating new sessions
async with session_creation_lock:
new_session_id = uuid4().hex
http_transport = StreamableHTTPServerTransport(
mcp_session_id=new_session_id,
is_json_response_enabled=json_response,
)
server_instances[http_transport.mcp_session_id] = http_transport
async with http_transport.connect() as streams:
read_stream, write_stream = streams

async def run_server():
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)

if not task_group:
raise RuntimeError("Task group is not initialized")

task_group.start_soon(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
else:
response = Response(
"Bad Request: No valid session ID provided",
status_code=HTTPStatus.BAD_REQUEST,
)
await response(scope, receive, send)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels awfully complex as a lowlevel API, particularly if every MCP Server using the lowlevel API would have to go and use something like this. I wonder if we can provide a more high-level abstraction for this, that makes it more managable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, we can make it happen, will add as a separate PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, let's rename this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dsp-ant I renamed the files later in the stack, it's quite a few of them by now (like 9 or 10), so was very difficult to rename in the very bottom. Happy to rename in the bottom if that's needed, just let me know.

Comment on lines +184 to +191
accept_types = [media_type.strip() for media_type in accept_header.split(",")]

has_json = any(
media_type.startswith(CONTENT_TYPE_JSON) for media_type in accept_types
)
has_sse = any(
media_type.startswith(CONTENT_TYPE_SSE) for media_type in accept_types
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given you have stripped accept_types as a list, you might even put them into a set and just do a check if CONTENT_TYPE_SSE is in media_type or CONTENT_TYPE_JSON is in media_type. It doesn't really matter here, but we don't need to traverse the list 3 times.


def _check_content_type(self, request: Request) -> bool:
"""Check if the request has the correct Content-Type."""
content_type = request.headers.get("content-type", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are request.headers always lowercase?

Comment on lines +198 to +202
content_type_parts = [
part.strip() for part in content_type.split(";")[0].split(",")
]

return any(part == CONTENT_TYPE_JSON for part in content_type_parts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
content_type_parts = [
part.strip() for part in content_type.split(";")[0].split(",")
]
return any(part == CONTENT_TYPE_JSON for part in content_type_parts)
content_type_parts = (
part.strip() for part in content_type.split(";")[0].split(",")
)
return any(part == CONTENT_TYPE_JSON for part in content_type_parts)

We never use content_type_parts other than checking it, we either can traverse it in a for-loop or should just use a generator here to be lazy about it.

Comment on lines +204 to +206
async def _handle_post_request(
self, scope: Scope, request: Request, receive: Receive, send: Send
) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we break this up? I think some of the branches, particularly the one where we handle an existing session and a non-existing session should probably be separate functions.

Comment on lines +279 to +283
if request_session_id and request_session_id != self.mcp_session_id:
response = self._create_error_response(
"Not Found: Invalid or expired session ID",
HTTPStatus.NOT_FOUND,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this is 404 or maybe rather a 400 BAD REQUEST? What does the TS SDK do here?

Copy link
Member

@dsp-ant dsp-ant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll accept this for now under the assumption that the fixes are done on top.

@ihrpr ihrpr merged commit 78f0b11 into main May 2, 2025
11 checks passed
@github-project-automation github-project-automation bot moved this from In Progress to Done in 2025-03-26 Implementation May 2, 2025
@ihrpr ihrpr deleted the ihrpr/streamablehttp-server branch May 2, 2025 10:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging this pull request may close these issues.

3 participants