4
4
Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
5
"""
6
6
7
+ import logging
7
8
from typing import Callable , Literal , Optional , Union
8
- from urllib .parse import parse_qs , urlencode , urlparse , urlunparse
9
+ from urllib .parse import urlencode , urlparse , urlunparse
9
10
10
11
from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , RootModel , ValidationError
11
12
from starlette .datastructures import FormData , QueryParams
12
13
from starlette .requests import Request
13
14
from starlette .responses import RedirectResponse , Response
14
15
15
16
from mcp .server .auth .errors import (
16
- InvalidClientError ,
17
17
InvalidRequestError ,
18
18
OAuthError ,
19
19
stringify_pydantic_error ,
20
20
)
21
- from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider , construct_redirect_uri
22
- from mcp .shared .auth import OAuthClientInformationFull
23
21
from mcp .server .auth .json_response import PydanticJSONResponse
24
-
25
- import logging
22
+ from mcp .server .auth .provider import (
23
+ AuthorizationParams ,
24
+ OAuthServerProvider ,
25
+ construct_redirect_uri ,
26
+ )
27
+ from mcp .shared .auth import OAuthClientInformationFull
26
28
27
29
logger = logging .getLogger (__name__ )
28
30
@@ -48,7 +50,6 @@ class AuthorizationRequest(BaseModel):
48
50
description = "Optional scope; if specified, should be "
49
51
"a space-separated list of scope strings" ,
50
52
)
51
-
52
53
53
54
54
55
def validate_scope (
@@ -80,30 +81,38 @@ def validate_redirect_uri(
80
81
raise InvalidRequestError (
81
82
"redirect_uri must be specified when client has multiple registered URIs"
82
83
)
84
+
85
+
83
86
ErrorCode = Literal [
84
- "invalid_request" ,
85
- "unauthorized_client" ,
86
- "access_denied" ,
87
- "unsupported_response_type" ,
88
- "invalid_scope" ,
89
- "server_error" ,
90
- "temporarily_unavailable"
91
- ]
87
+ "invalid_request" ,
88
+ "unauthorized_client" ,
89
+ "access_denied" ,
90
+ "unsupported_response_type" ,
91
+ "invalid_scope" ,
92
+ "server_error" ,
93
+ "temporarily_unavailable" ,
94
+ ]
95
+
96
+
92
97
class ErrorResponse (BaseModel ):
93
98
error : ErrorCode
94
99
error_description : str
95
100
error_uri : Optional [AnyUrl ] = None
96
101
# must be set if provided in the request
97
102
state : Optional [str ]
98
103
99
- def best_effort_extract_string (key : str , params : None | FormData | QueryParams ) -> Optional [str ]:
104
+
105
+ def best_effort_extract_string (
106
+ key : str , params : None | FormData | QueryParams
107
+ ) -> Optional [str ]:
100
108
if params is None :
101
109
return None
102
110
value = params .get (key )
103
111
if isinstance (value , str ):
104
112
return value
105
113
return None
106
114
115
+
107
116
class AnyHttpUrlModel (RootModel ):
108
117
root : AnyHttpUrl
109
118
@@ -118,18 +127,24 @@ async def authorization_handler(request: Request) -> Response:
118
127
client = None
119
128
params = None
120
129
121
- async def error_response (error : ErrorCode , error_description : str , attempt_load_client : bool = True ):
130
+ async def error_response (
131
+ error : ErrorCode , error_description : str , attempt_load_client : bool = True
132
+ ):
122
133
nonlocal client , redirect_uri , state
123
134
if client is None and attempt_load_client :
124
135
# make last-ditch attempt to load the client
125
136
client_id = best_effort_extract_string ("client_id" , params )
126
- client = client_id and await provider .clients_store .get_client (client_id )
137
+ client = client_id and await provider .clients_store .get_client (
138
+ client_id
139
+ )
127
140
if redirect_uri is None and client :
128
141
# make last-ditch effort to load the redirect uri
129
142
if params is not None and "redirect_uri" not in params :
130
143
raw_redirect_uri = None
131
144
else :
132
- raw_redirect_uri = AnyHttpUrlModel .model_validate (best_effort_extract_string ("redirect_uri" , params )).root
145
+ raw_redirect_uri = AnyHttpUrlModel .model_validate (
146
+ best_effort_extract_string ("redirect_uri" , params )
147
+ ).root
133
148
try :
134
149
redirect_uri = validate_redirect_uri (raw_redirect_uri , client )
135
150
except (ValidationError , InvalidRequestError ):
@@ -146,7 +161,9 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
146
161
147
162
if redirect_uri and client :
148
163
return RedirectResponse (
149
- url = construct_redirect_uri (str (redirect_uri ), ** error_resp .model_dump (exclude_none = True )),
164
+ url = construct_redirect_uri (
165
+ str (redirect_uri ), ** error_resp .model_dump (exclude_none = True )
166
+ ),
150
167
status_code = 302 ,
151
168
headers = {"Cache-Control" : "no-store" },
152
169
)
@@ -156,7 +173,7 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
156
173
content = error_resp ,
157
174
headers = {"Cache-Control" : "no-store" },
158
175
)
159
-
176
+
160
177
try :
161
178
# Parse request parameters
162
179
if request .method == "GET" :
@@ -165,20 +182,22 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
165
182
else :
166
183
# Parse form data for POST requests
167
184
params = await request .form ()
168
-
185
+
169
186
# Save state if it exists, even before validation
170
187
state = best_effort_extract_string ("state" , params )
171
-
188
+
172
189
try :
173
190
auth_request = AuthorizationRequest .model_validate (params )
174
191
state = auth_request .state # Update with validated state
175
192
except ValidationError as validation_error :
176
193
error : ErrorCode = "invalid_request"
177
194
for e in validation_error .errors ():
178
- if e [' loc' ] == (' response_type' ,) and e [' type' ] == ' literal_error' :
195
+ if e [" loc" ] == (" response_type" ,) and e [" type" ] == " literal_error" :
179
196
error = "unsupported_response_type"
180
197
break
181
- return await error_response (error , stringify_pydantic_error (validation_error ))
198
+ return await error_response (
199
+ error , stringify_pydantic_error (validation_error )
200
+ )
182
201
183
202
# Get client information
184
203
client = await provider .clients_store .get_client (auth_request .client_id )
@@ -190,7 +209,6 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
190
209
attempt_load_client = False ,
191
210
)
192
211
193
-
194
212
# Validate redirect_uri against client's registered URIs
195
213
try :
196
214
redirect_uri = validate_redirect_uri (auth_request .redirect_uri , client )
@@ -200,7 +218,7 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
200
218
error = "invalid_request" ,
201
219
error_description = validation_error .message ,
202
220
)
203
-
221
+
204
222
# Validate scope - for scope errors, we can redirect
205
223
try :
206
224
scopes = validate_scope (auth_request .scope , client )
@@ -210,28 +228,30 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
210
228
error = "invalid_scope" ,
211
229
error_description = validation_error .message ,
212
230
)
213
-
231
+
214
232
# Setup authorization parameters
215
233
auth_params = AuthorizationParams (
216
234
state = state ,
217
235
scopes = scopes ,
218
236
code_challenge = auth_request .code_challenge ,
219
237
redirect_uri = redirect_uri ,
220
238
)
221
-
239
+
222
240
# Let the provider pick the next URI to redirect to
223
241
response = RedirectResponse (
224
242
url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
225
243
)
226
- response .headers ["location" ] = await provider .authorize (
227
- client , auth_params
228
- )
244
+ response .headers ["location" ] = await provider .authorize (client , auth_params )
229
245
return response
230
-
246
+
231
247
except Exception as validation_error :
232
248
# Catch-all for unexpected errors
233
- logger .exception ("Unexpected error in authorization_handler" , exc_info = validation_error )
234
- return await error_response (error = "server_error" , error_description = "An unexpected error occurred" )
249
+ logger .exception (
250
+ "Unexpected error in authorization_handler" , exc_info = validation_error
251
+ )
252
+ return await error_response (
253
+ error = "server_error" , error_description = "An unexpected error occurred"
254
+ )
235
255
236
256
return authorization_handler
237
257
@@ -240,7 +260,7 @@ def create_error_redirect(
240
260
redirect_uri : AnyUrl , error : Union [Exception , ErrorResponse ]
241
261
) -> str :
242
262
parsed_uri = urlparse (str (redirect_uri ))
243
-
263
+
244
264
if isinstance (error , ErrorResponse ):
245
265
# Convert ErrorResponse to dict
246
266
error_dict = error .model_dump (exclude_none = True )
@@ -251,7 +271,7 @@ def create_error_redirect(
251
271
query_params [key ] = str (value )
252
272
else :
253
273
query_params [key ] = value
254
-
274
+
255
275
elif isinstance (error , OAuthError ):
256
276
query_params = {"error" : error .error_code , "error_description" : str (error )}
257
277
else :
0 commit comments