Streaming delivers LLM output token-by-token as itβs generated, enabling responsive UIs and real-time feedback.
Enabling Streaming
Enable streaming when creating an agent:
from opper_agents import Agent
agent = Agent(
name = "StreamingAgent" ,
enable_streaming = True
)
Handling Stream Events
Using Hooks
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
@hook ( "stream_start" )
async def on_start ( context : AgentContext, call_type : str , ** kwargs ):
print ( f "Starting { call_type } stream..." )
@hook ( "stream_chunk" )
async def on_chunk ( context : AgentContext, chunk_data : dict , ** kwargs ):
# chunk_data contains: delta, json_path, etc.
delta = chunk_data.get( "delta" , "" )
print (delta, end = "" , flush = True )
@hook ( "stream_end" )
async def on_end ( context : AgentContext, call_type : str , ** kwargs ):
print ( f " \n { call_type } stream complete" )
@hook ( "stream_error" )
async def on_error ( context : AgentContext, error : Exception , ** kwargs ):
print ( f "Stream error: { error } " )
agent = Agent(
name = "StreamingAgent" ,
enable_streaming = True ,
hooks = [on_start, on_chunk, on_end, on_error]
)
Using Callbacks
The Python SDK uses hooks for streaming events. See the hooks example above.
# Python uses the @hook decorator pattern for streaming
# See the "Using Hooks" example above
Chunk Data
Each chunk contains:
@hook ( "stream_chunk" )
async def on_chunk ( context : AgentContext, chunk_data : dict , ** kwargs ):
chunk_data.get( "delta" ) # New text in this chunk
chunk_data.get( "json_path" ) # Path for structured output (e.g., "content")
kwargs.get( "call_type" ) # "think" or "final_result"
Streaming with Structured Output
When using output schemas, streaming still works:
from pydantic import BaseModel
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
class StoryOutput ( BaseModel ):
title: str
content: str
moral: str
@hook ( "stream_chunk" )
async def on_chunk ( context : AgentContext, chunk_data : dict , ** kwargs ):
# json_path shows which field is being streamed
json_path = chunk_data.get( "json_path" , "" )
delta = chunk_data.get( "delta" , "" )
# Stream content field (may appear as "content" or "final_result.content")
if "content" in json_path and delta:
print (delta, end = "" , flush = True )
agent = Agent(
name = "StoryAgent" ,
output_schema = StoryOutput,
enable_streaming = True ,
hooks = [on_chunk]
)
# Returns typed StoryOutput, but streams during generation
result = await agent.process( "Write a short fable" )
print ( f " \n Title: { result.title } " )
print ( f "Moral: { result.moral } " )
Call Types
The call_type indicates whatβs being streamed:
Call Type Description thinkAgent reasoning/thinking final_resultFinal output generation
@hook ( "stream_start" )
async def on_start ( context : AgentContext, call_type : str , ** kwargs ):
if call_type == "think" :
print ( "Agent is thinking..." )
elif call_type == "final_result" :
print ( "Generating response..." )
Web Server Example
Stream to a web client:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext
import asyncio
app = FastAPI()
@app.get ( "/stream" )
async def stream_response ( query : str ):
queue = asyncio.Queue()
@hook ( "stream_chunk" )
async def on_chunk ( context : AgentContext, chunk_data : dict , ** kwargs ):
delta = chunk_data.get( "delta" , "" )
if delta:
await queue.put(delta)
@hook ( "stream_end" )
async def on_end ( context : AgentContext, call_type : str , ** kwargs ):
await queue.put( None ) # Signal end
agent = Agent(
name = "StreamAgent" ,
enable_streaming = True ,
hooks = [on_chunk, on_end]
)
async def generate ():
# Start agent in background
task = asyncio.create_task(agent.process(query))
while True :
chunk = await queue.get()
if chunk is None :
break
yield f "data: { chunk } \n\n "
await task
return StreamingResponse(
generate(),
media_type = "text/event-stream"
)
Best Practices
Flush output : Use flush=True in Python for immediate display
Handle errors : Always implement error handlers for robust UX
Show progress : Indicate streaming state to users
Buffer appropriately : For very fast streams, consider batching updates
Clean up : Close connections and clean up resources on completion
Next Steps
MCP Integration Connect to external tools
Hooks All lifecycle events