Streaming delivers LLM output token-by-token as it’s generated, enabling responsive UIs and real-time feedback.
Enabling Streaming
Enable streaming when creating an agent:
from opper_agents import Agent
agent = Agent(
name="StreamingAgent",
enable_streaming=True
)
Handling Stream Events
Using Hooks
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
@hook("stream_start")
async def on_start(context: AgentContext, call_type: str, **kwargs):
print(f"Starting {call_type} stream...")
@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
# chunk_data contains: delta, json_path, etc.
delta = chunk_data.get("delta", "")
print(delta, end="", flush=True)
@hook("stream_end")
async def on_end(context: AgentContext, call_type: str, **kwargs):
print(f"\n{call_type} stream complete")
@hook("stream_error")
async def on_error(context: AgentContext, error: Exception, **kwargs):
print(f"Stream error: {error}")
agent = Agent(
name="StreamingAgent",
enable_streaming=True,
hooks=[on_start, on_chunk, on_end, on_error]
)
Using Callbacks
The Python SDK uses hooks for streaming events. See the hooks example above.
# Python uses the @hook decorator pattern for streaming
# See the "Using Hooks" example above
Chunk Data
Each chunk contains:
@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
chunk_data.get("delta") # New text in this chunk
chunk_data.get("json_path") # Path for structured output (e.g., "content")
kwargs.get("call_type") # "think" or "final_result"
Streaming with Structured Output
When using output schemas, streaming still works:
from pydantic import BaseModel
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
class StoryOutput(BaseModel):
title: str
content: str
moral: str
@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
# json_path shows which field is being streamed
json_path = chunk_data.get("json_path", "")
delta = chunk_data.get("delta", "")
# Stream content field (may appear as "content" or "final_result.content")
if "content" in json_path and delta:
print(delta, end="", flush=True)
agent = Agent(
name="StoryAgent",
output_schema=StoryOutput,
enable_streaming=True,
hooks=[on_chunk]
)
# Returns typed StoryOutput, but streams during generation
result = await agent.process("Write a short fable")
print(f"\nTitle: {result.title}")
print(f"Moral: {result.moral}")
Call Types
The call_type indicates what’s being streamed:
| Call Type | Description |
|---|
think | Agent reasoning/thinking |
final_result | Final output generation |
@hook("stream_start")
async def on_start(context: AgentContext, call_type: str, **kwargs):
if call_type == "think":
print("Agent is thinking...")
elif call_type == "final_result":
print("Generating response...")
Web Server Example
Stream to a web client:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
import asyncio
app = FastAPI()
@app.get("/stream")
async def stream_response(query: str):
queue = asyncio.Queue()
@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
delta = chunk_data.get("delta", "")
if delta:
await queue.put(delta)
@hook("stream_end")
async def on_end(context: AgentContext, call_type: str, **kwargs):
await queue.put(None) # Signal end
agent = Agent(
name="StreamAgent",
enable_streaming=True,
hooks=[on_chunk, on_end]
)
async def generate():
# Start agent in background
task = asyncio.create_task(agent.process(query))
while True:
chunk = await queue.get()
if chunk is None:
break
yield f"data: {chunk}\n\n"
await task
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Best Practices
- Flush output: Use
flush=True in Python for immediate display
- Handle errors: Always implement error handlers for robust UX
- Show progress: Indicate streaming state to users
- Buffer appropriately: For very fast streams, consider batching updates
- Clean up: Close connections and clean up resources on completion
Next Steps