4
4
Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
5
"""
6
6
7
- import re
8
- from urllib .parse import urlparse , urlunparse , urlencode
9
- from typing import Any , Callable , Dict , List , Literal , Optional
10
- from urllib .parse import urlencode , parse_qs
7
+ from typing import Literal
8
+ from urllib .parse import urlencode , urlparse , urlunparse
11
9
12
- from starlette .requests import Request
13
- from starlette .responses import JSONResponse , RedirectResponse , Response
14
10
from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
15
- from pydantic_core import Url
11
+ from starlette .requests import Request
12
+ from starlette .responses import RedirectResponse , Response
16
13
17
14
from mcp .server .auth .errors import (
18
- InvalidClientError ,
15
+ InvalidClientError ,
19
16
InvalidRequestError ,
20
- UnsupportedResponseTypeError ,
21
- ServerError ,
22
17
OAuthError ,
23
18
)
19
+ from mcp .server .auth .handlers .types import HandlerFn
24
20
from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
25
- from mcp .shared .auth import OAuthClientInformationFull
26
21
27
22
28
23
class AuthorizationRequest (BaseModel ):
29
24
"""
30
25
Model for the authorization request parameters.
31
-
32
- Corresponds to request schema in authorizationHandler in src/server/auth/handlers/authorize.ts
26
+
27
+ Corresponds to request schema in authorizationHandler in
28
+ src/server/auth/handlers/authorize.ts
33
29
"""
30
+
34
31
client_id : str = Field (..., description = "The client ID" )
35
- redirect_uri : AnyHttpUrl | None = Field (..., description = "URL to redirect to after authorization" )
32
+ redirect_uri : AnyHttpUrl | None = Field (
33
+ ..., description = "URL to redirect to after authorization"
34
+ )
36
35
37
- response_type : Literal ["code" ] = Field (..., description = "Must be 'code' for authorization code flow" )
36
+ response_type : Literal ["code" ] = Field (
37
+ ..., description = "Must be 'code' for authorization code flow"
38
+ )
38
39
code_challenge : str = Field (..., description = "PKCE code challenge" )
39
- code_challenge_method : Literal ["S256" ] = Field ("S256" , description = "PKCE code challenge method" )
40
- state : Optional [str ] = Field (None , description = "Optional state parameter" )
41
- scope : Optional [str ] = Field (None , description = "Optional scope parameter" )
42
-
40
+ code_challenge_method : Literal ["S256" ] = Field (
41
+ "S256" , description = "PKCE code challenge method"
42
+ )
43
+ state : str | None = Field (None , description = "Optional state parameter" )
44
+ scope : str | None = Field (None , description = "Optional scope parameter" )
45
+
43
46
class Config :
44
47
extra = "ignore"
45
48
49
+
46
50
def validate_scope (requested_scope : str | None , scope : str | None ) -> list [str ] | None :
47
51
if requested_scope is None :
48
52
return None
@@ -53,7 +57,10 @@ def validate_scope(requested_scope: str | None, scope: str | None) -> list[str]
53
57
raise InvalidRequestError (f"Client was not registered with scope { scope } " )
54
58
return requested_scopes
55
59
56
- def validate_redirect_uri (redirect_uri : AnyHttpUrl | None , redirect_uris : list [AnyHttpUrl ]) -> AnyHttpUrl :
60
+
61
+ def validate_redirect_uri (
62
+ redirect_uri : AnyHttpUrl | None , redirect_uris : list [AnyHttpUrl ]
63
+ ) -> AnyHttpUrl :
57
64
if not redirect_uris :
58
65
raise InvalidClientError ("Client has no registered redirect URIs" )
59
66
@@ -67,16 +74,19 @@ def validate_redirect_uri(redirect_uri: AnyHttpUrl | None, redirect_uris: list[A
67
74
elif len (redirect_uris ) == 1 :
68
75
return redirect_uris [0 ]
69
76
else :
70
- raise InvalidRequestError ("redirect_uri must be specified when client has multiple registered URIs" )
77
+ raise InvalidRequestError (
78
+ "redirect_uri must be specified when client has multiple registered URIs"
79
+ )
80
+
71
81
72
- def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
82
+ def create_authorization_handler (provider : OAuthServerProvider ) -> HandlerFn :
73
83
"""
74
84
Create a handler for the OAuth 2.0 Authorization endpoint.
75
-
85
+
76
86
Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
77
87
78
88
"""
79
-
89
+
80
90
async def authorization_handler (request : Request ) -> Response :
81
91
"""
82
92
Handler for the OAuth 2.0 Authorization endpoint.
@@ -94,65 +104,64 @@ async def authorization_handler(request: Request) -> Response:
94
104
auth_request = AuthorizationRequest .model_validate (params )
95
105
except ValidationError as e :
96
106
raise InvalidRequestError (str (e ))
97
-
107
+
98
108
# Get client information
99
- try :
100
- client = await provider .clients_store .get_client (auth_request .client_id )
101
- except OAuthError as e :
102
- # TODO: proper error rendering
103
- raise InvalidClientError (str (e ))
104
-
109
+ client = await provider .clients_store .get_client (auth_request .client_id )
110
+
105
111
if not client :
106
112
raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
107
-
108
-
113
+
109
114
# do validation which is dependent on the client configuration
110
- redirect_uri = validate_redirect_uri (auth_request .redirect_uri , client .redirect_uris )
115
+ redirect_uri = validate_redirect_uri (
116
+ auth_request .redirect_uri , client .redirect_uris
117
+ )
111
118
scopes = validate_scope (auth_request .scope , client .scope )
112
-
119
+
113
120
auth_params = AuthorizationParams (
114
121
state = auth_request .state ,
115
122
scopes = scopes ,
116
123
code_challenge = auth_request .code_challenge ,
117
124
redirect_uri = redirect_uri ,
118
125
)
119
-
120
- response = RedirectResponse (url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" })
121
-
126
+
127
+ response = RedirectResponse (
128
+ url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
129
+ )
130
+
122
131
try :
123
132
# Let the provider handle the authorization flow
124
133
await provider .authorize (client , auth_params , response )
125
-
134
+
126
135
return response
127
136
except Exception as e :
128
137
return RedirectResponse (
129
138
url = create_error_redirect (redirect_uri , e , auth_request .state ),
130
139
status_code = 302 ,
131
140
headers = {"Cache-Control" : "no-store" },
132
- )
133
-
141
+ )
142
+
134
143
return authorization_handler
135
144
136
- def create_error_redirect (redirect_uri : AnyUrl , error : Exception , state : Optional [str ]) -> str :
145
+
146
+ def create_error_redirect (
147
+ redirect_uri : AnyUrl , error : Exception , state : str | None
148
+ ) -> str :
137
149
parsed_uri = urlparse (str (redirect_uri ))
138
150
if isinstance (error , OAuthError ):
139
- query_params = {
140
- "error" : error .error_code ,
141
- "error_description" : str (error )
142
- }
151
+ query_params = {"error" : error .error_code , "error_description" : str (error )}
143
152
else :
144
153
query_params = {
145
154
"error" : "internal_error" ,
146
- "error_description" : "An unknown error occurred"
155
+ "error_description" : "An unknown error occurred" ,
147
156
}
148
157
# TODO: should we add error_uri?
149
158
# if error.error_uri:
150
159
# query_params["error_uri"] = str(error.error_uri)
151
160
if state :
152
161
query_params ["state" ] = state
153
-
162
+
154
163
new_query = urlencode (query_params )
155
164
if parsed_uri .query :
156
165
new_query = f"{ parsed_uri .query } &{ new_query } "
157
-
158
- return urlunparse (parsed_uri ._replace (query = new_query ))
166
+
167
+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments