Skip to main content
Agent state can be ephemeral (in-memory) or persistent (saved to disk). Understanding how to manage state effectively lets you build agents that remember context, maintain preferences, and survive restarts.

Types of State

Ephemeral State (In-Memory)

State that only exists while the agent is running:
class MyAgent(OpperatorAgent):
    def initialize(self):
        # Ephemeral state - lost on restart
        self.request_count = 0
        self.active_connections = []
        self.cache = {}
Use for:
  • Counters and metrics
  • Active connections
  • Temporary caches
  • Current processing state

Persistent State (Disk)

State that survives agent restarts:
import json
import os

class MyAgent(OpperatorAgent):
    def initialize(self):
        # Load persistent state
        self.state_file = os.path.join(
            self.get_working_directory(),
            ".agent_state.json"
        )
        self.load_persistent_state()

    def load_persistent_state(self):
        if os.path.exists(self.state_file):
            with open(self.state_file) as f:
                state = json.load(f)
                self.preferences = state.get("preferences", {})
                self.history = state.get("history", [])
        else:
            self.preferences = {}
            self.history = []

    def save_persistent_state(self):
        state = {
            "preferences": self.preferences,
            "history": self.history
        }
        with open(self.state_file, 'w') as f:
            json.dump(state, f, indent=2)

    def cleanup(self):
        # Save state before exit
        self.save_persistent_state()
        super().cleanup()
Use for:
  • User preferences
  • Historical data
  • Learned patterns
  • Configuration overrides

Per-Conversation State

Store state separately for each conversation:
class ConversationAgent(OpperatorAgent):
    def initialize(self):
        # Per-conversation state
        self.conversation_states = {}
        self.current_conversation = None

    def on_new_conversation(self, conversation_id: str, is_clear: bool):
        self.current_conversation = conversation_id

        # Initialize state for this conversation
        self.conversation_states[conversation_id] = {
            "messages": [],
            "context": {},
            "preferences": {}
        }

    def on_conversation_switched(
        self,
        conversation_id: str,
        previous_id: str,
        message_count: int
    ):
        self.current_conversation = conversation_id

        # Initialize if we haven't seen this conversation
        if conversation_id not in self.conversation_states:
            self.conversation_states[conversation_id] = {
                "messages": [],
                "context": {},
                "preferences": {}
            }

    def on_conversation_deleted(self, conversation_id: str):
        # Clean up state
        if conversation_id in self.conversation_states:
            del self.conversation_states[conversation_id]

    def get_current_state(self):
        """Get state for current conversation"""
        if self.current_conversation in self.conversation_states:
            return self.conversation_states[self.current_conversation]
        return None

Persistent Per-Conversation State

Combine persistent storage with per-conversation state:
import json
import os
from typing import Dict, Any

class PersistentAgent(OpperatorAgent):
    def initialize(self):
        self.state_dir = os.path.join(
            self.get_working_directory(),
            ".agent_state"
        )
        os.makedirs(self.state_dir, exist_ok=True)

        self.conversation_states = {}
        self.current_conversation = None

    def get_state_file(self, conversation_id: str) -> str:
        """Get path to state file for conversation"""
        return os.path.join(self.state_dir, f"{conversation_id}.json")

    def load_conversation_state(self, conversation_id: str) -> Dict[str, Any]:
        """Load state from disk"""
        state_file = self.get_state_file(conversation_id)

        if os.path.exists(state_file):
            try:
                with open(state_file) as f:
                    return json.load(f)
            except Exception as e:
                self.log(LogLevel.WARNING, "Failed to load state",
                         conversation_id=conversation_id, error=str(e))

        # Return default state
        return {
            "notes": [],
            "preferences": {},
            "history": []
        }

    def save_conversation_state(self, conversation_id: str):
        """Save state to disk"""
        if conversation_id not in self.conversation_states:
            return

        state_file = self.get_state_file(conversation_id)
        state = self.conversation_states[conversation_id]

        try:
            with open(state_file, 'w') as f:
                json.dump(state, f, indent=2)
        except Exception as e:
            self.log(LogLevel.ERROR, "Failed to save state",
                     conversation_id=conversation_id, error=str(e))

    def on_new_conversation(self, conversation_id: str, is_clear: bool):
        self.current_conversation = conversation_id

        if is_clear:
            # Clear creates fresh state
            self.conversation_states[conversation_id] = {
                "notes": [],
                "preferences": {},
                "history": []
            }
            self.save_conversation_state(conversation_id)
        else:
            # New conversation - try to load existing state
            state = self.load_conversation_state(conversation_id)
            self.conversation_states[conversation_id] = state

    def on_conversation_switched(
        self,
        conversation_id: str,
        previous_id: str,
        message_count: int
    ):
        # Save previous conversation state
        if previous_id and previous_id in self.conversation_states:
            self.save_conversation_state(previous_id)

        self.current_conversation = conversation_id

        # Load or initialize state
        if conversation_id not in self.conversation_states:
            state = self.load_conversation_state(conversation_id)
            self.conversation_states[conversation_id] = state

    def on_conversation_deleted(self, conversation_id: str):
        # Delete state file
        state_file = self.get_state_file(conversation_id)
        if os.path.exists(state_file):
            os.remove(state_file)

        # Remove from memory
        if conversation_id in self.conversation_states:
            del self.conversation_states[conversation_id]

    def cleanup(self):
        # Save all conversation states before exit
        for conversation_id in self.conversation_states:
            self.save_conversation_state(conversation_id)

        super().cleanup()

Configuration Updates

Handle dynamic configuration changes without restart:
class ConfigurableAgent(OpperatorAgent):
    def initialize(self):
        # Load initial configuration
        self.api_endpoint = self.config.get("api_endpoint", "https://api.example.com")
        self.poll_interval = self.config.get("poll_interval", 60)
        self.features = self.config.get("features", {})

    def on_config_update(self, config: Dict[str, Any]):
        """Called when SIGHUP signal received"""
        self.log(LogLevel.INFO, "Configuration reloaded", config=config)

        # Update API endpoint
        new_endpoint = config.get("api_endpoint")
        if new_endpoint and new_endpoint != self.api_endpoint:
            old_endpoint = self.api_endpoint
            self.api_endpoint = new_endpoint
            self.reconnect_to_api()
            self.log(
                LogLevel.INFO,
                "API endpoint updated",
                old=old_endpoint,
                new=new_endpoint
            )

        # Update poll interval
        new_interval = config.get("poll_interval", 60)
        if new_interval != self.poll_interval:
            self.poll_interval = new_interval
            self.log(
                LogLevel.INFO,
                "Poll interval updated",
                interval=new_interval
            )

        # Update feature flags
        self.features = config.get("features", {})
To trigger config reload:
  1. Edit agents.yaml
  2. Find agent PID: ps aux | grep python
  3. Send SIGHUP: kill -HUP <pid>
Config reloading via SIGHUP only works on Unix/Linux systems.

Status Reporting

Report current state for monitoring and debugging:
class MonitorableAgent(OpperatorAgent):
    def initialize(self):
        self.start_time = None
        self.request_count = 0
        self.error_count = 0

    def start(self):
        self.start_time = time.time()

    def on_status(self):
        """Called when SIGUSR1 signal received"""
        # Calculate metrics
        uptime = time.time() - self.start_time if self.start_time else 0
        error_rate = (self.error_count / self.request_count * 100) if self.request_count > 0 else 0

        # Log comprehensive status
        self.log(
            LogLevel.INFO,
            "Health check",
            uptime_seconds=int(uptime),
            requests=self.request_count,
            errors=self.error_count,
            error_rate_percent=f"{error_rate:.2f}",
            conversations=len(self.conversation_states),
            active=self.active
        )

        # Check for issues
        if error_rate > 10:
            self.log(LogLevel.WARNING, "High error rate",
                     error_rate=f"{error_rate:.2f}%")
To trigger status report:
  1. Find agent PID: ps aux | grep python
  2. Send SIGUSR1: kill -USR1 <pid>
  3. View logs in TUI or log file

Complete Example

Full agent with persistent per-conversation state:
from opperator_agent import OpperatorAgent, LogLevel, CommandArgument
import json
import os
import time
from typing import Dict, Any, Optional

class NoteAgent(OpperatorAgent):
    def __init__(self):
        super().__init__(name="note_agent")
        self.conversation_states = {}
        self.current_conversation = None
        self.start_time = None

    def initialize(self):
        self.state_dir = os.path.join(
            self.get_working_directory(),
            ".note_agent_state"
        )
        os.makedirs(self.state_dir, exist_ok=True)

        self.register_command(
            "add_note",
            self.cmd_add_note,
            description="Add a note to current conversation",
            arguments=[
                CommandArgument("note", "string", "Note text", required=True)
            ]
        )

        self.register_command(
            "list_notes",
            self.cmd_list_notes,
            description="List all notes for current conversation"
        )

        self.register_section("status", "Note Agent", "Initializing")

    def start(self):
        self.start_time = time.time()
        self.update_section("status", "<c fg='green'>● Ready</c>")

    # State management
    def get_state_file(self, conversation_id: str) -> str:
        return os.path.join(self.state_dir, f"{conversation_id}.json")

    def load_state(self, conversation_id: str) -> Dict[str, Any]:
        state_file = self.get_state_file(conversation_id)
        if os.path.exists(state_file):
            try:
                with open(state_file) as f:
                    return json.load(f)
            except Exception as e:
                self.log(LogLevel.WARNING, "Failed to load state", error=str(e))
        return {"notes": [], "created": time.time()}

    def save_state(self, conversation_id: str):
        if conversation_id not in self.conversation_states:
            return

        state_file = self.get_state_file(conversation_id)
        try:
            with open(state_file, 'w') as f:
                json.dump(self.conversation_states[conversation_id], f, indent=2)
        except Exception as e:
            self.log(LogLevel.ERROR, "Failed to save state", error=str(e))

    # Conversation events
    def on_new_conversation(self, conversation_id: str, is_clear: bool):
        self.current_conversation = conversation_id

        if is_clear:
            self.conversation_states[conversation_id] = {
                "notes": [],
                "created": time.time()
            }
            self.save_state(conversation_id)
        else:
            self.conversation_states[conversation_id] = self.load_state(conversation_id)

        self.update_ui()

    def on_conversation_switched(
        self,
        conversation_id: str,
        previous_id: str,
        message_count: int
    ):
        # Save previous
        if previous_id:
            self.save_state(previous_id)

        # Load current
        self.current_conversation = conversation_id
        if conversation_id not in self.conversation_states:
            self.conversation_states[conversation_id] = self.load_state(conversation_id)

        self.update_ui()

    def on_conversation_deleted(self, conversation_id: str):
        state_file = self.get_state_file(conversation_id)
        if os.path.exists(state_file):
            os.remove(state_file)

        if conversation_id in self.conversation_states:
            del self.conversation_states[conversation_id]

    # Commands
    def cmd_add_note(self, args):
        note = args["note"]

        if not self.current_conversation:
            return {"error": "No active conversation"}

        state = self.conversation_states[self.current_conversation]
        state["notes"].append({
            "text": note,
            "timestamp": time.time()
        })

        self.save_state(self.current_conversation)
        self.update_ui()

        return {"added": note, "total": len(state["notes"])}

    def cmd_list_notes(self, args):
        if not self.current_conversation:
            return {"notes": []}

        state = self.conversation_states[self.current_conversation]
        return {"notes": [n["text"] for n in state["notes"]]}

    # UI updates
    def update_ui(self):
        if not self.current_conversation:
            return

        state = self.conversation_states.get(self.current_conversation, {})
        note_count = len(state.get("notes", []))

        self.update_section(
            "status",
            f"""
<c fg='green'>● Ready</c>
<b>Conversation:</b> {self.current_conversation[:8]}...
<b>Notes:</b> {note_count}
            """.strip()
        )

    # Status reporting
    def on_status(self):
        uptime = time.time() - self.start_time if self.start_time else 0
        total_notes = sum(
            len(state.get("notes", []))
            for state in self.conversation_states.values()
        )

        self.log(
            LogLevel.INFO,
            "Status",
            uptime_seconds=int(uptime),
            conversations=len(self.conversation_states),
            total_notes=total_notes,
            current_conversation=self.current_conversation
        )

    def cleanup(self):
        # Save all states before exit
        for conv_id in self.conversation_states:
            self.save_state(conv_id)
        super().cleanup()

if __name__ == "__main__":
    agent = NoteAgent()
    agent.run()

Best Practices

Ephemeral: Counters, metrics, temporary cachesPersistent: User preferences, history, learned data
# Ephemeral - OK to lose on restart
self.request_count = 0

# Persistent - must survive restart
self.save_preferences(self.user_preferences)
Always handle file I/O errors gracefully:
def load_state(self, conversation_id):
    try:
        with open(state_file) as f:
            return json.load(f)
    except FileNotFoundError:
        return self.get_default_state()
    except json.JSONDecodeError:
        self.log(LogLevel.WARNING, "Corrupted state file")
        return self.get_default_state()
    except Exception as e:
        self.log(LogLevel.ERROR, "Failed to load state", error=str(e))
        return self.get_default_state()
Balance between data safety and performance:Save on:
  • Agent cleanup (always)
  • Conversation switch
  • After important changes
  • Periodically (for long-running agents)
Don’t save:
  • After every tiny change
  • On every command execution
  • In tight loops
Remove state files when conversations are deleted:
def on_conversation_deleted(self, conversation_id):
    # Remove from memory
    if conversation_id in self.conversation_states:
        del self.conversation_states[conversation_id]

    # Delete file
    state_file = self.get_state_file(conversation_id)
    if os.path.exists(state_file):
        os.remove(state_file)

Next Steps