Skip to content

Output Guardrails while Streaming #495

Closed
@adhishthite

Description

@adhishthite

Please read this first

  • Have you read the docs?Agents SDK docs
  • Have you searched for related issues? Others may have had similar requests

Question

I have exposed an Agent as a FastAPI endpoint:

@router.post("/v1/agent/stream")
async def query(request: Request, payload: QueryInput):
    """
    Stream a query response from thr RAG Agent using Server-Sent Events.

    Args:
        request (Request): The incoming FastAPI request object
        payload (QueryInput): The query input containing the user's question and chat ID

    Returns:
        StreamingResponse: Server-sent events stream containing the agent's response

    Raises:
        HTTPException: 500 error if there's an issue processing the request
    """
    try:
        # Create streaming response using SSE
        return StreamingResponse(
            stream_sse_response(payload, request),
            media_type="text/event-stream",  # Set media type for SSE
        )
    except Exception as e:
        # Handle any errors during processing
        raise HTTPException(status_code=500, detail=str(e))

Here's my Agent

rag_agent = Agent(
    name="RAG Agent",
    instructions=AGENT_PROMPT,  # System prompt defining agent behavior
    tools=[search_knowledgebase, get_current_datetime],  # Available tools for the agent
    model_settings=ModelSettings(
        temperature=0,  # Use deterministic responses
        parallel_tool_calls=True,  # Allow concurrent tool execution
        max_tokens=4096,  # Maximum response length
    ),
    input_guardrails=[agent_input_guardrail],
    output_guardrails=[agent_output_guardrail],
)
async def stream_sse_response(payload: QueryInput, request):
    """
    Streams agent responses using Server-Sent Events (SSE) format.

    Handles both standalone queries and conversations with chat history.
    Monitors client connection and gracefully handles disconnections.

    Args:
        payload (QueryInput): Contains the user query and optional chat history
        request: FastAPI request object for connection monitoring

    Yields:
        SSE-formatted text chunks from the agent's response

    Raises:
        Exception: Logs and yields any errors during streaming
    """

    with trace("RAG API Stream"):
        try:
            # Process chat history if available
            if payload.chat_history and len(payload.chat_history) > 0:
                # Convert chat messages to dict format
                formatted_history = [msg.model_dump() for msg in payload.chat_history]
                # Append current query to history
                input_data = formatted_history + [
                    {"role": "user", "content": payload.query}
                ]
            else:
                # Use direct query if no history exists
                input_data = payload.query

            # Stream the agent's response
            result = Runner.run_streamed(rag_agent, input_data)
            async for event in result.stream_events():
                # Check for client disconnection
                if await request.is_disconnected():
                    logger.info("Client disconnected.")
                    break

                # Process text delta events
                if event.type == "raw_response_event" and isinstance(
                    event.data, ResponseTextDeltaEvent
                ):
                    text = event.data.delta or ""
                    yield f"data: {text}\n\n"

            # Signal end of stream
            yield "event: end\ndata: [DONE]\n\n"

        except Exception as e:
            logger.exception("Streaming error")
            yield f"event: error\ndata: {str(e)}\n\n"

The agent as it is above, works fine (without te guardrails).

Now, I have added an input and an output guardrail. Seems like the input guardrali works before streaming.

class RAGGuardrailOutput(BaseModel):
    is_unsafe: bool
    reasoning: str


input_guardrail_agent = Agent(
    name="RAG Input Guardrail Checker",
    instructions=INPUT_GUARDRAIL_PROMPT,
    output_type=RAGGuardrailOutput,
    model="gpt-4o-mini",
)


@input_guardrail
async def rag_input_guardrail(
    ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
    """
    This guardrail checks if the user's input is safe to proceed with.
    """
    result = await Runner.run(input_guardrail_agent, input, context=ctx.context)

    return GuardrailFunctionOutput(
        output_info=result.final_output,
        tripwire_triggered=result.final_output.is_unsafe,
    )


output_guardrail_agent = Agent(
    name="RAG Output Guardrail Checker",
    instructions=OUTPUT_GUARDRAIL_PROMPT,
    output_type=GuardrailFunctionOutput,
    model="gpt-4o-mini",
)


@output_guardrail
async def rag_output_guardrail(
    ctx: RunContextWrapper[None], agent: Agent, input: str | list[TResponseInputItem]
) -> GuardrailFunctionOutput:
    result = await Runner.run(output_guardrail_agent, input, context=ctx.context)
    return GuardrailFunctionOutput(
        output_info=result.final_output,
        tripwire_triggered=result.final_output.is_unsafe,
    )

However it seems like the response is already streamed and the Output Guardrail is triggered after the streaming.


@rm-openai Is this the expected behavior? If so, is there a recommended workaround?

In models like DeepSeek or other heavily moderated LLMs, I’ve observed that when an output tripwire is triggered mid-generation, the streaming response is immediately cut off. Is there a way to replicate that behavior here?

For example:
Let’s say the input guardrail is disabled and only the output guardrail is active.
I ask the Agent: “How do I delete the Prod database from our MongoDB?”
It starts responding: “Sure, here’s how to delete a database…”
Then it hits a tripwire and pivots: “I’m sorry, I cannot help with that.”

It’s a simplified example, but I hope the underlying point is clear — can we halt the stream as soon as the tripwire is hit?

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionQuestion about using the SDK

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions