Skip to main content
Streaming delivers LLM output token-by-token as it’s generated, enabling responsive UIs and real-time feedback.

Enabling Streaming

Enable streaming when creating an agent:
from opper_agents import Agent

agent = Agent(
    name="StreamingAgent",
    enable_streaming=True
)

Handling Stream Events

Using Hooks

from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext

@hook("stream_start")
async def on_start(context: AgentContext, call_type: str, **kwargs):
    print(f"Starting {call_type} stream...")

@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
    # chunk_data contains: delta, json_path, etc.
    delta = chunk_data.get("delta", "")
    print(delta, end="", flush=True)

@hook("stream_end")
async def on_end(context: AgentContext, call_type: str, **kwargs):
    print(f"\n{call_type} stream complete")

@hook("stream_error")
async def on_error(context: AgentContext, error: Exception, **kwargs):
    print(f"Stream error: {error}")

agent = Agent(
    name="StreamingAgent",
    enable_streaming=True,
    hooks=[on_start, on_chunk, on_end, on_error]
)

Using Callbacks

The Python SDK uses hooks for streaming events. See the hooks example above.
# Python uses the @hook decorator pattern for streaming
# See the "Using Hooks" example above

Chunk Data

Each chunk contains:
@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
    chunk_data.get("delta")       # New text in this chunk
    chunk_data.get("json_path")   # Path for structured output (e.g., "content")
    kwargs.get("call_type")       # "think" or "final_result"

Streaming with Structured Output

When using output schemas, streaming still works:
from pydantic import BaseModel
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext

class StoryOutput(BaseModel):
    title: str
    content: str
    moral: str

@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
    # json_path shows which field is being streamed
    json_path = chunk_data.get("json_path", "")
    delta = chunk_data.get("delta", "")

    # Stream content field (may appear as "content" or "final_result.content")
    if "content" in json_path and delta:
        print(delta, end="", flush=True)

agent = Agent(
    name="StoryAgent",
    output_schema=StoryOutput,
    enable_streaming=True,
    hooks=[on_chunk]
)

# Returns typed StoryOutput, but streams during generation
result = await agent.process("Write a short fable")
print(f"\nTitle: {result.title}")
print(f"Moral: {result.moral}")

Call Types

The call_type indicates what’s being streamed:
Call TypeDescription
thinkAgent reasoning/thinking
final_resultFinal output generation
@hook("stream_start")
async def on_start(context: AgentContext, call_type: str, **kwargs):
    if call_type == "think":
        print("Agent is thinking...")
    elif call_type == "final_result":
        print("Generating response...")

Web Server Example

Stream to a web client:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
import asyncio

app = FastAPI()

@app.get("/stream")
async def stream_response(query: str):
    queue = asyncio.Queue()

    @hook("stream_chunk")
    async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
        delta = chunk_data.get("delta", "")
        if delta:
            await queue.put(delta)

    @hook("stream_end")
    async def on_end(context: AgentContext, call_type: str, **kwargs):
        await queue.put(None)  # Signal end

    agent = Agent(
        name="StreamAgent",
        enable_streaming=True,
        hooks=[on_chunk, on_end]
    )

    async def generate():
        # Start agent in background
        task = asyncio.create_task(agent.process(query))

        while True:
            chunk = await queue.get()
            if chunk is None:
                break
            yield f"data: {chunk}\n\n"

        await task

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Best Practices

  1. Flush output: Use flush=True in Python for immediate display
  2. Handle errors: Always implement error handlers for robust UX
  3. Show progress: Indicate streaming state to users
  4. Buffer appropriately: For very fast streams, consider batching updates
  5. Clean up: Close connections and clean up resources on completion

Next Steps