Description
MCP Server tools code:
`@mcp.tool(description="查询物流信息")
async def query_logistics(order_id: str) -> str:
"""查询物流信息。当用户需要根据订单号查询物流信息时,调用此工具
Args:
order_id: 订单号
Returns:
物流信息的字符串描述
"""
统一的物流信息数据
tracking_info = [
{"time": "2024-01-20 10:00:00", "status": "包裹已揽收",
"location": "深圳转运中心"},
{"time": "2024-01-20 15:30:00", "status": "运输中", "location": "深圳市"},
{"time": "2024-01-21 09:00:00", "status": "到达目的地",
"location": "北京市"},
{"time": "2024-01-21 14:00:00", "status": "派送中",
"location": "北京市朝阳区"},
{"time": "2024-01-21 16:30:00", "status": "已签收",
"location": "北京市朝阳区三里屯"}
]
格式化物流信息
result = f"物流单号:{order_id}\n\n物流轨迹:\n"
for item in tracking_info:
result += f"[{item['time']}] {item['status']} - {item['location']}\n"
return result
MCP client execute code:
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.aenter()
self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()
# Initialize
await self.session.initialize()
# List available tools to verify connection
print("Initialized SSE client...")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
result = await self.session.call_tool("query_logistics", {"order_id":"123"})
print("Result of query_logistics tool:", result)`
error info:
Connected to server with tools: ['query_logistics']
Result of add tool: meta=None content=[TextContent(type='text', text='Error executing tool query_logistics: 1 validation error for query_logisticsArguments\norder_id\n Input should be a valid string [type=string_type, input_value=123, input_type=int]\n For further information visit https://errors.pydantic.dev/2.10/v/string_type', annotations=None)] isError=True