Agents extend the OpperatorAgent base class and implement lifecycle methods that control behavior during startup, operation, and shutdown.
Minimal Agent
from opperator import OpperatorAgent, LogLevel
class MyAgent(OpperatorAgent):
def __init__(self):
super().__init__(name="my_agent", version="1.0.0")
def initialize(self):
self.log(LogLevel.INFO, "Initializing")
self.counter = 0
def start(self):
self.log(LogLevel.INFO, "Started")
if __name__ == "__main__":
agent = MyAgent()
agent.run()
Lifecycle Methods
Your agent goes through several lifecycle stages. Understanding these helps you initialize resources at the right time and clean up properly.
initialize()
Setup phase called before start(). Use this for:
- Loading configuration
- Setting up initial state
- Configuring the agent description
def initialize(self):
# Load config
self.api_url = self.config.get("api_url")
self.timeout = self.config.get("timeout", 30)
# Initialize state
self.request_count = 0
# Set description
self.set_description("Monitors API endpoints")
Don’t start background threads or long-running operations here. Save those for start().
start()
Activation phase called after initialize(). Use this for:
- Registering commands
- Opening connections
- Starting background tasks
def start(self):
# Register commands
self.register_command("status", self.get_status)
# Open connections
self.api_client = APIClient(self.api_url)
# Start background tasks
self.start_monitoring()
main_loop()
Override for custom event loop (optional). Default waits for shutdown signal.
def main_loop(self):
while self.running:
events = self.fetch_events()
for event in events:
self.process_event(event)
time.sleep(self.poll_interval)
Most agents don’t need a custom main_loop(). Commands and event handlers usually provide enough control.
on_shutdown()
Graceful cleanup when SIGTERM or SIGINT (Ctrl+C) received.
def on_shutdown(self):
self.running = False
self.stop_event.set()
if self.current_task:
self.complete_task(self.current_task)
self.save_state()
Should complete quickly (< 5 seconds). Not guaranteed to finish before termination.
cleanup()
Final cleanup. Always runs before exit, even on crash.
def cleanup(self):
if hasattr(self, 'api_client'):
self.api_client.close()
if hasattr(self, 'db_connection'):
self.db_connection.close()
super().cleanup() # MUST call this
CRITICAL: Always call super().cleanup() or async executors won’t shut down.
Agent Properties
| Property | Type | Description |
self.running | bool | Agent should continue operating. True when running, False on shutdown. |
self.config | dict | Configuration from agents.yaml. Available after load_config(). |
self.name | str | Agent identifier |
self.version | str | Agent version |
Logging
from opperator import LogLevel
self.log(LogLevel.INFO, "Processing started")
self.log(LogLevel.WARNING, "High memory usage")
self.log(LogLevel.ERROR, "Connection failed")
# Structured logging
self.log(LogLevel.INFO, "Request processed",
request_id="abc123", duration_ms=245, status="success")
Levels: DEBUG, INFO, WARNING, ERROR, FATAL
Core Agent Methods
set_system_prompt(prompt)
Set or update the system prompt that guides the LLM’s behavior when interacting with your agent. For advanced system prompt patterns and dynamic updates, see System Prompts.
def initialize(self):
self.set_system_prompt("You are a monitoring agent that helps users track API health.")
get_working_directory() / get_invocation_directory()
Access the agent’s working directory or the user’s invocation directory:
def start(self):
# Where the agent process is running
work_dir = self.get_working_directory()
# Where the user ran 'op' from (or None if not set)
invocation_dir = self.get_invocation_directory()
self.log(LogLevel.INFO, "Directories",
work_dir=work_dir, invocation_dir=invocation_dir)
Key differences:
get_working_directory() - Returns where the agent process is running
get_invocation_directory() - Returns where the user ran op from (may be None if not available)
When the TUI starts, the daemon stores the directory where op was invoked. Agents can fetch this via get_invocation_directory() and receive on_invocation_directory_changed() events when it changes.
set_description(description)
Set a human-readable description of what your agent does.
def initialize(self):
self.set_description("Monitors API endpoints and alerts on failures")
get_secret(name, timeout=5.0)
Securely retrieve secrets without hardcoding them in your agent.
def initialize(self):
self.api_key = self.get_secret("api_key")
Report progress for async commands with async_enabled=True.
def process_large_file(self, args):
call_id = args.get("call_id")
total = len(items)
for i, item in enumerate(items):
self.process_item(item)
if i % 100 == 0:
percentage = (i / total) * 100
self.report_progress(call_id, percentage, f"Processed {i}/{total}")
register_section(section_id, title, content) / update_section(section_id, content)
Add live sidebar sections for displaying dynamic status information.
def start(self):
self.register_section("metrics", "📊 Metrics", "Requests: 0")
def update_metrics(self):
self.update_section("metrics", f"Requests: {self.request_count}")
Signal Handling
| Signal | Trigger | Handler | Platform | Notes |
SIGTERM | kill <pid> | on_shutdown() | All | Graceful shutdown |
SIGINT | Ctrl+C | on_shutdown() | All | Same as SIGTERM |
SIGHUP | kill -HUP <pid> | on_config_update() | Unix/Linux | Config reload |
SIGUSR1 | kill -USR1 <pid> | on_status() | Unix/Linux | Health check |
Event Handlers
Beyond the core lifecycle methods, agents can respond to additional events:
Protocol Events (sent by Opperator daemon):
on_new_conversation() - New conversation created
on_conversation_switched() - User switched conversations
on_conversation_deleted() - Conversation deleted
on_agent_activated() - Agent becomes active
on_agent_deactivated() - Another agent becomes active
on_invocation_directory_changed() - User’s working directory changed
Signal Events (triggered by OS signals):
on_config_update() - SIGHUP received, config reloaded
on_status() - SIGUSR1 received, health check requested
Complete Example
import time
import requests
from opperator import OpperatorAgent, LogLevel
class MonitoringAgent(OpperatorAgent):
def __init__(self):
super().__init__(name="monitoring_agent", version="1.0.0")
self.start_time = None
self.request_count = 0
def initialize(self):
self.set_description("Monitors API endpoints")
self.endpoints = self.config.get("endpoints", [])
self.check_interval = self.config.get("check_interval", 60)
def start(self):
self.start_time = time.time()
self.register_command("check_health", self.check_health)
def main_loop(self):
while self.running:
for endpoint in self.endpoints:
self.check_endpoint(endpoint)
time.sleep(self.check_interval)
def check_endpoint(self, endpoint):
try:
response = requests.get(endpoint, timeout=10)
self.request_count += 1
if response.status_code == 200:
self.log(LogLevel.INFO, "Endpoint healthy", endpoint=endpoint)
else:
self.log(LogLevel.WARNING, "Endpoint unhealthy",
endpoint=endpoint, status=response.status_code)
except Exception as exc:
self.log(LogLevel.ERROR, "Check failed",
endpoint=endpoint, error=str(exc))
def check_health(self, args):
return {"checked": len(self.endpoints), "requests": self.request_count}
def on_status(self):
uptime = time.time() - self.start_time
self.log(LogLevel.INFO, "Status", uptime=int(uptime),
requests=self.request_count)
def on_shutdown(self):
self.running = False
def cleanup(self):
super().cleanup()
if __name__ == "__main__":
agent = MonitoringAgent()
agent.run()
Next Steps