Skip to main content
Hooks let you observe agent execution without modifying the agent’s behavior. Use them for logging, metrics, debugging, and custom integrations.

Available Events

EventDescription
agent:startAgent begins processing
agent:endAgent completes processing
loop:startIteration begins
loop:endIteration completes
llm:callLLM request sent
llm:responseLLM response received
think:endAgent finished thinking
tool:beforeBefore tool execution
tool:afterAfter tool execution
tool:errorTool threw an error
memory:readMemory entries read
memory:writeMemory entry written
stream:startStreaming begins
stream:chunkStream chunk received
stream:endStreaming completes

Registering Hooks

Python: Decorator Pattern

from opper_agents import Agent, hook
from opper_agents.base.context import AgentContext

@hook("agent_start")
async def on_start(context: AgentContext, agent, **kwargs):
    print(f"🚀 Agent {agent.name} starting")
    print(f"Session: {context.session_id}")

@hook("agent_end")
async def on_end(context: AgentContext, agent, **kwargs):
    print(f"✅ Agent {agent.name} completed")
    print(f"Total tokens: {context.usage.total_tokens}")

@hook("tool_call")
async def on_tool(context: AgentContext, agent, tool, parameters, **kwargs):
    print(f"🔧 Calling {tool.name} with {parameters}")

@hook("tool_result")
async def on_result(context: AgentContext, agent, tool, result, **kwargs):
    print(f"📤 {tool.name} returned: {result.result}")

agent = Agent(
    name="MyAgent",
    hooks=[on_start, on_end, on_tool, on_result],
    tools=[my_tool]
)

TypeScript: Event Emitter Pattern

import { Agent, HookEvents } from "@opperai/agents";

const agent = new Agent({
  name: "MyAgent",
  tools: [myTool]
});

agent.on(HookEvents.AgentStart, (payload) => {
  console.log(`🚀 Agent ${payload.agent.name} starting`);
  console.log(`Session: ${payload.context.sessionId}`);
});

agent.on(HookEvents.AgentEnd, (payload) => {
  console.log(`✅ Agent ${payload.agent.name} completed`);
  console.log(`Total tokens: ${payload.context.usage.totalTokens}`);
});

agent.on(HookEvents.BeforeTool, (payload) => {
  console.log(`🔧 Calling ${payload.toolName} with ${JSON.stringify(payload.args)}`);
});

agent.on(HookEvents.AfterTool, (payload) => {
  console.log(`📤 ${payload.toolName} returned: ${payload.result}`);
});

Hook Payloads

Agent Context

All hooks receive the agent context:
from opper_agents.base.context import AgentContext

@hook("agent_end")
async def on_end(context: AgentContext, agent, **kwargs):
    # Session info
    print(context.session_id)
    print(context.agent_name)

    # Execution state
    print(context.iteration)
    print(context.goal)

    # History
    for cycle in context.execution_history:
        print(cycle.thought)
        print(cycle.tool_calls)

    # Usage metrics
    print(context.usage.requests)
    print(context.usage.input_tokens)
    print(context.usage.output_tokens)
    print(context.usage.total_tokens)
    print(context.usage.cost.total)

Tool Events

from opper_agents.base.context import AgentContext

@hook("tool_call")
async def before_tool(context: AgentContext, agent, tool, parameters, **kwargs):
    # tool: Tool object with name, description, schema
    # parameters: The arguments passed to the tool
    pass

@hook("tool_result")
async def after_tool(context: AgentContext, agent, tool, result, **kwargs):
    # result.success: bool
    # result.result: The return value
    # result.error: Error message (if failed)
    # result.execution_time: Time in ms
    pass

@hook("tool_error")
async def on_error(context: AgentContext, agent, tool, error, **kwargs):
    # error: The exception that was raised
    pass

Common Use Cases

Logging

import logging
from opper_agents.base.context import AgentContext

logger = logging.getLogger("agent")

@hook("agent_start")
async def log_start(context: AgentContext, agent, **kwargs):
    logger.info(f"Agent {agent.name} started", extra={
        "session_id": context.session_id,
        "goal": context.goal
    })

@hook("agent_end")
async def log_end(context: AgentContext, agent, **kwargs):
    logger.info(f"Agent {agent.name} completed", extra={
        "iterations": context.iteration,
        "tokens": context.usage.total_tokens
    })

Metrics Collection

from prometheus_client import Counter, Histogram
from opper_agents.base.context import AgentContext

tool_calls = Counter('agent_tool_calls', 'Tool calls', ['agent', 'tool'])
tool_duration = Histogram('agent_tool_duration', 'Tool duration', ['agent', 'tool'])

@hook("tool_result")
async def track_metrics(context: AgentContext, agent, tool, result, **kwargs):
    tool_calls.labels(agent=agent.name, tool=tool.name).inc()
    tool_duration.labels(agent=agent.name, tool=tool.name).observe(
        result.execution_time / 1000  # Convert to seconds
    )

Cost Tracking

from opper_agents.base.context import AgentContext

@hook("agent_end")
async def track_cost(context: AgentContext, agent, **kwargs):
    cost = context.usage.cost
    print(f"Generation cost: ${cost.generation:.4f}")
    print(f"Platform cost: ${cost.platform:.4f}")
    print(f"Total cost: ${cost.total:.4f}")

    # Save to database
    await db.save_usage({
        "session_id": context.session_id,
        "agent": agent.name,
        "tokens": context.usage.total_tokens,
        "cost": cost.total
    })

Progress Reporting

from opper_agents.base.context import AgentContext

@hook("loop_start")
async def on_iteration(context: AgentContext, agent, **kwargs):
    print(f"Iteration {context.iteration}/{agent.max_iterations}")

@hook("tool_call")
async def on_tool(context: AgentContext, agent, tool, parameters, **kwargs):
    print(f"  → Calling {tool.name}...")

@hook("tool_result")
async def on_result(context: AgentContext, agent, tool, result, **kwargs):
    status = "✓" if result.success else "✗"
    print(f"  {status} {tool.name} complete")

Streaming Hooks

For streaming-enabled agents:
from opper_agents.base.context import AgentContext

@hook("stream_start")
async def on_stream_start(context: AgentContext, call_type: str, **kwargs):
    print(f"Streaming {call_type} started...")

@hook("stream_chunk")
async def on_chunk(context: AgentContext, chunk_data: dict, **kwargs):
    # chunk_data is a dict with keys: delta, json_path, etc.
    delta = chunk_data.get("delta", "")
    print(delta, end="", flush=True)

@hook("stream_end")
async def on_stream_end(context: AgentContext, call_type: str, **kwargs):
    print(f"\nStreaming {call_type} complete")

Error Handling in Hooks

Hooks are wrapped in try/catch to prevent breaking agent execution:
from opper_agents.base.context import AgentContext

@hook("tool_result")
async def flaky_hook(context: AgentContext, agent, tool, result, **kwargs):
    # If this throws, the agent continues
    raise Exception("Hook failed!")
    # Agent execution is NOT affected

Best Practices

  1. Keep hooks fast: Long-running hooks delay agent execution
  2. Handle errors: Don’t let hook errors crash your application
  3. Use async: For I/O operations in hooks
  4. Be selective: Only subscribe to events you need
  5. Avoid side effects: Hooks should observe, not modify

Next Steps