Description
Please read this first
- Have you read the docs?Agents SDK docs
- Have you searched for related issues? Others may have had similar requests
Question
I have exposed an Agent as a FastAPI endpoint:
@router.post("/v1/agent/stream")
async def query(request: Request, payload: QueryInput):
"""
Stream a query response from thr RAG Agent using Server-Sent Events.
Args:
request (Request): The incoming FastAPI request object
payload (QueryInput): The query input containing the user's question and chat ID
Returns:
StreamingResponse: Server-sent events stream containing the agent's response
Raises:
HTTPException: 500 error if there's an issue processing the request
"""
try:
# Create streaming response using SSE
return StreamingResponse(
stream_sse_response(payload, request),
media_type="text/event-stream", # Set media type for SSE
)
except Exception as e:
# Handle any errors during processing
raise HTTPException(status_code=500, detail=str(e))
Here's my Agent
rag_agent = Agent(
name="RAG Agent",
instructions=AGENT_PROMPT, # System prompt defining agent behavior
tools=[search_knowledgebase, get_current_datetime], # Available tools for the agent
model_settings=ModelSettings(
temperature=0, # Use deterministic responses
parallel_tool_calls=True, # Allow concurrent tool execution
max_tokens=4096, # Maximum response length
),
input_guardrails=[agent_input_guardrail],
output_guardrails=[agent_output_guardrail],
)
async def stream_sse_response(payload: QueryInput, request):
"""
Streams agent responses using Server-Sent Events (SSE) format.
Handles both standalone queries and conversations with chat history.
Monitors client connection and gracefully handles disconnections.
Args:
payload (QueryInput): Contains the user query and optional chat history
request: FastAPI request object for connection monitoring
Yields:
SSE-formatted text chunks from the agent's response
Raises:
Exception: Logs and yields any errors during streaming
"""
with trace("RAG API Stream"):
try:
# Process chat history if available
if payload.chat_history and len(payload.chat_history) > 0:
# Convert chat messages to dict format
formatted_history = [msg.model_dump() for msg in payload.chat_history]
# Append current query to history
input_data = formatted_history + [
{"role": "user", "content": payload.query}
]
else:
# Use direct query if no history exists
input_data = payload.query
# Stream the agent's response
result = Runner.run_streamed(rag_agent, input_data)
async for event in result.stream_events():
# Check for client disconnection
if await request.is_disconnected():
logger.info("Client disconnected.")
break
# Process text delta events
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
text = event.data.delta or ""
yield f"data: {text}\n\n"
# Signal end of stream
yield "event: end\ndata: [DONE]\n\n"
except Exception as e:
logger.exception("Streaming error")
yield f"event: error\ndata: {str(e)}\n\n"
The agent as it is above, works fine (without te guardrails).
Now, I have added an input and an output guardrail. Seems like the input guardrali works before streaming.
class RAGGuardrailOutput(BaseModel):
is_unsafe: bool
reasoning: str
input_guardrail_agent = Agent(
name="RAG Input Guardrail Checker",
instructions=INPUT_GUARDRAIL_PROMPT,
output_type=RAGGuardrailOutput,
model="gpt-4o-mini",
)
@input_guardrail
async def rag_input_guardrail(
ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
"""
This guardrail checks if the user's input is safe to proceed with.
"""
result = await Runner.run(input_guardrail_agent, input, context=ctx.context)
return GuardrailFunctionOutput(
output_info=result.final_output,
tripwire_triggered=result.final_output.is_unsafe,
)
output_guardrail_agent = Agent(
name="RAG Output Guardrail Checker",
instructions=OUTPUT_GUARDRAIL_PROMPT,
output_type=GuardrailFunctionOutput,
model="gpt-4o-mini",
)
@output_guardrail
async def rag_output_guardrail(
ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
result = await Runner.run(output_guardrail_agent, input, context=ctx.context)
return GuardrailFunctionOutput(
output_info=result.final_output,
tripwire_triggered=result.final_output.is_unsafe,
)
However it seems like the response is already streamed and the Output Guardrail is triggered after the streaming.
@rm-openai Is this the expected behavior? If so, is there a recommended workaround?
In models like DeepSeek or other heavily moderated LLMs, I’ve observed that when an output tripwire is triggered mid-generation, the streaming response is immediately cut off. Is there a way to replicate that behavior here?
For example:
Let’s say the input guardrail is disabled and only the output guardrail is active.
I ask the Agent: “How do I delete the Prod database from our MongoDB?”
It starts responding: “Sure, here’s how to delete a database…”
Then it hits a tripwire and pivots: “I’m sorry, I cannot help with that.”
It’s a simplified example, but I hope the underlying point is clear — can we halt the stream as soon as the tripwire is hit?