Types of State
Ephemeral State (In-Memory)
State that only exists while the agent is running:Copy
Ask AI
class MyAgent(OpperatorAgent):
def initialize(self):
# Ephemeral state - lost on restart
self.request_count = 0
self.active_connections = []
self.cache = {}
- Counters and metrics
- Active connections
- Temporary caches
- Current processing state
Persistent State (Disk)
State that survives agent restarts:Copy
Ask AI
import json
import os
class MyAgent(OpperatorAgent):
def initialize(self):
# Load persistent state
self.state_file = os.path.join(
self.get_working_directory(),
".agent_state.json"
)
self.load_persistent_state()
def load_persistent_state(self):
if os.path.exists(self.state_file):
with open(self.state_file) as f:
state = json.load(f)
self.preferences = state.get("preferences", {})
self.history = state.get("history", [])
else:
self.preferences = {}
self.history = []
def save_persistent_state(self):
state = {
"preferences": self.preferences,
"history": self.history
}
with open(self.state_file, 'w') as f:
json.dump(state, f, indent=2)
def cleanup(self):
# Save state before exit
self.save_persistent_state()
super().cleanup()
- User preferences
- Historical data
- Learned patterns
- Configuration overrides
Per-Conversation State
Store state separately for each conversation:Copy
Ask AI
class ConversationAgent(OpperatorAgent):
def initialize(self):
# Per-conversation state
self.conversation_states = {}
self.current_conversation = None
def on_new_conversation(self, conversation_id: str, is_clear: bool):
self.current_conversation = conversation_id
# Initialize state for this conversation
self.conversation_states[conversation_id] = {
"messages": [],
"context": {},
"preferences": {}
}
def on_conversation_switched(
self,
conversation_id: str,
previous_id: str,
message_count: int
):
self.current_conversation = conversation_id
# Initialize if we haven't seen this conversation
if conversation_id not in self.conversation_states:
self.conversation_states[conversation_id] = {
"messages": [],
"context": {},
"preferences": {}
}
def on_conversation_deleted(self, conversation_id: str):
# Clean up state
if conversation_id in self.conversation_states:
del self.conversation_states[conversation_id]
def get_current_state(self):
"""Get state for current conversation"""
if self.current_conversation in self.conversation_states:
return self.conversation_states[self.current_conversation]
return None
Persistent Per-Conversation State
Combine persistent storage with per-conversation state:Copy
Ask AI
import json
import os
from typing import Dict, Any
class PersistentAgent(OpperatorAgent):
def initialize(self):
self.state_dir = os.path.join(
self.get_working_directory(),
".agent_state"
)
os.makedirs(self.state_dir, exist_ok=True)
self.conversation_states = {}
self.current_conversation = None
def get_state_file(self, conversation_id: str) -> str:
"""Get path to state file for conversation"""
return os.path.join(self.state_dir, f"{conversation_id}.json")
def load_conversation_state(self, conversation_id: str) -> Dict[str, Any]:
"""Load state from disk"""
state_file = self.get_state_file(conversation_id)
if os.path.exists(state_file):
try:
with open(state_file) as f:
return json.load(f)
except Exception as e:
self.log(LogLevel.WARNING, "Failed to load state",
conversation_id=conversation_id, error=str(e))
# Return default state
return {
"notes": [],
"preferences": {},
"history": []
}
def save_conversation_state(self, conversation_id: str):
"""Save state to disk"""
if conversation_id not in self.conversation_states:
return
state_file = self.get_state_file(conversation_id)
state = self.conversation_states[conversation_id]
try:
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
except Exception as e:
self.log(LogLevel.ERROR, "Failed to save state",
conversation_id=conversation_id, error=str(e))
def on_new_conversation(self, conversation_id: str, is_clear: bool):
self.current_conversation = conversation_id
if is_clear:
# Clear creates fresh state
self.conversation_states[conversation_id] = {
"notes": [],
"preferences": {},
"history": []
}
self.save_conversation_state(conversation_id)
else:
# New conversation - try to load existing state
state = self.load_conversation_state(conversation_id)
self.conversation_states[conversation_id] = state
def on_conversation_switched(
self,
conversation_id: str,
previous_id: str,
message_count: int
):
# Save previous conversation state
if previous_id and previous_id in self.conversation_states:
self.save_conversation_state(previous_id)
self.current_conversation = conversation_id
# Load or initialize state
if conversation_id not in self.conversation_states:
state = self.load_conversation_state(conversation_id)
self.conversation_states[conversation_id] = state
def on_conversation_deleted(self, conversation_id: str):
# Delete state file
state_file = self.get_state_file(conversation_id)
if os.path.exists(state_file):
os.remove(state_file)
# Remove from memory
if conversation_id in self.conversation_states:
del self.conversation_states[conversation_id]
def cleanup(self):
# Save all conversation states before exit
for conversation_id in self.conversation_states:
self.save_conversation_state(conversation_id)
super().cleanup()
Configuration Updates
Handle dynamic configuration changes without restart:Copy
Ask AI
class ConfigurableAgent(OpperatorAgent):
def initialize(self):
# Load initial configuration
self.api_endpoint = self.config.get("api_endpoint", "https://api.example.com")
self.poll_interval = self.config.get("poll_interval", 60)
self.features = self.config.get("features", {})
def on_config_update(self, config: Dict[str, Any]):
"""Called when SIGHUP signal received"""
self.log(LogLevel.INFO, "Configuration reloaded", config=config)
# Update API endpoint
new_endpoint = config.get("api_endpoint")
if new_endpoint and new_endpoint != self.api_endpoint:
old_endpoint = self.api_endpoint
self.api_endpoint = new_endpoint
self.reconnect_to_api()
self.log(
LogLevel.INFO,
"API endpoint updated",
old=old_endpoint,
new=new_endpoint
)
# Update poll interval
new_interval = config.get("poll_interval", 60)
if new_interval != self.poll_interval:
self.poll_interval = new_interval
self.log(
LogLevel.INFO,
"Poll interval updated",
interval=new_interval
)
# Update feature flags
self.features = config.get("features", {})
- Edit
agents.yaml - Find agent PID:
ps aux | grep python - Send SIGHUP:
kill -HUP <pid>
Config reloading via SIGHUP only works on Unix/Linux systems.
Status Reporting
Report current state for monitoring and debugging:Copy
Ask AI
class MonitorableAgent(OpperatorAgent):
def initialize(self):
self.start_time = None
self.request_count = 0
self.error_count = 0
def start(self):
self.start_time = time.time()
def on_status(self):
"""Called when SIGUSR1 signal received"""
# Calculate metrics
uptime = time.time() - self.start_time if self.start_time else 0
error_rate = (self.error_count / self.request_count * 100) if self.request_count > 0 else 0
# Log comprehensive status
self.log(
LogLevel.INFO,
"Health check",
uptime_seconds=int(uptime),
requests=self.request_count,
errors=self.error_count,
error_rate_percent=f"{error_rate:.2f}",
conversations=len(self.conversation_states),
active=self.active
)
# Check for issues
if error_rate > 10:
self.log(LogLevel.WARNING, "High error rate",
error_rate=f"{error_rate:.2f}%")
- Find agent PID:
ps aux | grep python - Send SIGUSR1:
kill -USR1 <pid> - View logs in TUI or log file
Complete Example
Full agent with persistent per-conversation state:Copy
Ask AI
from opperator_agent import OpperatorAgent, LogLevel, CommandArgument
import json
import os
import time
from typing import Dict, Any, Optional
class NoteAgent(OpperatorAgent):
def __init__(self):
super().__init__(name="note_agent")
self.conversation_states = {}
self.current_conversation = None
self.start_time = None
def initialize(self):
self.state_dir = os.path.join(
self.get_working_directory(),
".note_agent_state"
)
os.makedirs(self.state_dir, exist_ok=True)
self.register_command(
"add_note",
self.cmd_add_note,
description="Add a note to current conversation",
arguments=[
CommandArgument("note", "string", "Note text", required=True)
]
)
self.register_command(
"list_notes",
self.cmd_list_notes,
description="List all notes for current conversation"
)
self.register_section("status", "Note Agent", "Initializing")
def start(self):
self.start_time = time.time()
self.update_section("status", "<c fg='green'>● Ready</c>")
# State management
def get_state_file(self, conversation_id: str) -> str:
return os.path.join(self.state_dir, f"{conversation_id}.json")
def load_state(self, conversation_id: str) -> Dict[str, Any]:
state_file = self.get_state_file(conversation_id)
if os.path.exists(state_file):
try:
with open(state_file) as f:
return json.load(f)
except Exception as e:
self.log(LogLevel.WARNING, "Failed to load state", error=str(e))
return {"notes": [], "created": time.time()}
def save_state(self, conversation_id: str):
if conversation_id not in self.conversation_states:
return
state_file = self.get_state_file(conversation_id)
try:
with open(state_file, 'w') as f:
json.dump(self.conversation_states[conversation_id], f, indent=2)
except Exception as e:
self.log(LogLevel.ERROR, "Failed to save state", error=str(e))
# Conversation events
def on_new_conversation(self, conversation_id: str, is_clear: bool):
self.current_conversation = conversation_id
if is_clear:
self.conversation_states[conversation_id] = {
"notes": [],
"created": time.time()
}
self.save_state(conversation_id)
else:
self.conversation_states[conversation_id] = self.load_state(conversation_id)
self.update_ui()
def on_conversation_switched(
self,
conversation_id: str,
previous_id: str,
message_count: int
):
# Save previous
if previous_id:
self.save_state(previous_id)
# Load current
self.current_conversation = conversation_id
if conversation_id not in self.conversation_states:
self.conversation_states[conversation_id] = self.load_state(conversation_id)
self.update_ui()
def on_conversation_deleted(self, conversation_id: str):
state_file = self.get_state_file(conversation_id)
if os.path.exists(state_file):
os.remove(state_file)
if conversation_id in self.conversation_states:
del self.conversation_states[conversation_id]
# Commands
def cmd_add_note(self, args):
note = args["note"]
if not self.current_conversation:
return {"error": "No active conversation"}
state = self.conversation_states[self.current_conversation]
state["notes"].append({
"text": note,
"timestamp": time.time()
})
self.save_state(self.current_conversation)
self.update_ui()
return {"added": note, "total": len(state["notes"])}
def cmd_list_notes(self, args):
if not self.current_conversation:
return {"notes": []}
state = self.conversation_states[self.current_conversation]
return {"notes": [n["text"] for n in state["notes"]]}
# UI updates
def update_ui(self):
if not self.current_conversation:
return
state = self.conversation_states.get(self.current_conversation, {})
note_count = len(state.get("notes", []))
self.update_section(
"status",
f"""
<c fg='green'>● Ready</c>
<b>Conversation:</b> {self.current_conversation[:8]}...
<b>Notes:</b> {note_count}
""".strip()
)
# Status reporting
def on_status(self):
uptime = time.time() - self.start_time if self.start_time else 0
total_notes = sum(
len(state.get("notes", []))
for state in self.conversation_states.values()
)
self.log(
LogLevel.INFO,
"Status",
uptime_seconds=int(uptime),
conversations=len(self.conversation_states),
total_notes=total_notes,
current_conversation=self.current_conversation
)
def cleanup(self):
# Save all states before exit
for conv_id in self.conversation_states:
self.save_state(conv_id)
super().cleanup()
if __name__ == "__main__":
agent = NoteAgent()
agent.run()
Best Practices
Choose the right state type
Choose the right state type
Ephemeral: Counters, metrics, temporary cachesPersistent: User preferences, history, learned data
Copy
Ask AI
# Ephemeral - OK to lose on restart
self.request_count = 0
# Persistent - must survive restart
self.save_preferences(self.user_preferences)
Handle state loading errors
Handle state loading errors
Always handle file I/O errors gracefully:
Copy
Ask AI
def load_state(self, conversation_id):
try:
with open(state_file) as f:
return json.load(f)
except FileNotFoundError:
return self.get_default_state()
except json.JSONDecodeError:
self.log(LogLevel.WARNING, "Corrupted state file")
return self.get_default_state()
except Exception as e:
self.log(LogLevel.ERROR, "Failed to load state", error=str(e))
return self.get_default_state()
Save state at the right times
Save state at the right times
Balance between data safety and performance:Save on:
- Agent cleanup (always)
- Conversation switch
- After important changes
- Periodically (for long-running agents)
- After every tiny change
- On every command execution
- In tight loops
Clean up deleted conversations
Clean up deleted conversations
Remove state files when conversations are deleted:
Copy
Ask AI
def on_conversation_deleted(self, conversation_id):
# Remove from memory
if conversation_id in self.conversation_states:
del self.conversation_states[conversation_id]
# Delete file
state_file = self.get_state_file(conversation_id)
if os.path.exists(state_file):
os.remove(state_file)