Skip to main content
Opperator provides two types of lifecycle events to notify agents about state changes:
  1. Protocol Events - Sent from the TUI/daemon via IPC when conversations or agents change
  2. Signal Events - Triggered by OS signals for configuration and diagnostics

Quick Reference

EventTypeStatusTrigger
on_new_conversation()Protocol✅ WorksNew conversation or /clear
on_conversation_switched()Protocol✅ WorksUser switches conversations
on_conversation_deleted()Protocol✅ WorksConversation deleted
on_agent_activated()Protocol✅ WorksAgent becomes active
on_agent_deactivated()Protocol✅ WorksAgent becomes inactive
on_invocation_directory_changed()Protocol✅ WorksUser changes invocation directory
on_config_update()Signal✅ WorksSIGHUP signal (kill -HUP <pid>)
on_status()Signal✅ WorksSIGUSR1 signal (kill -USR1 <pid>)

Architecture

Protocol Events Flow:
User Action → TUI/Daemon → IPC Request → Agent Subprocess → Event Handler
Signal Events Flow:
OS Signal (SIGHUP/SIGUSR1) → Signal Handler → Event Handler → Your Code

Protocol-Based Events

These events are automatically sent by the Opperator daemon when user actions occur.

Conversation Events

Called: When new conversation created or /clear executedParameters:
  • conversation_id (str): Unique conversation identifier
  • is_clear (bool): True if /clear command, False if new conversation
Use cases:
  • Clear conversation-specific caches
  • Reset session state
  • Initialize new conversation context
  • Clear accumulated data
def on_new_conversation(self, conversation_id: str, is_clear: bool):
    source = "cleared" if is_clear else "created"
    self.log(LogLevel.INFO, f"Conversation {source}", id=conversation_id)

    # Reset conversation-specific state
    self.conversation_data = {}
    self.message_count = 0

    # Update system prompt
    self.set_system_prompt(f"""
Chat agent ready.

Conversation: {conversation_id}
Messages: 0
    """)
Called: When user switches to a different conversationParameters:
  • conversation_id (str): Conversation being switched to
  • previous_id (str): Previous conversation ID (may be empty)
  • message_count (int): Number of messages in new conversation
Use cases:
  • Load conversation-specific state
  • Restore session data
  • Update UI to reflect context
  • Resume conversation history
def on_conversation_switched(
    self,
    conversation_id: str,
    previous_id: str,
    message_count: int
):
    self.log(
        LogLevel.INFO,
        "Switched conversations",
        from_id=previous_id,
        to_id=conversation_id,
        messages=message_count
    )

    # Save previous conversation state
    if previous_id:
        self.save_state(previous_id)

    # Load new conversation state
    state = self.load_state(conversation_id)
    if state:
        self.restore_from_state(state)
    else:
        # New conversation we haven't seen
        self.initialize_conversation_state()

    # Update sidebar
    self.update_section(
        "info",
        f"Conversation: {conversation_id[:8]}...\n"
        f"Messages: {message_count}"
    )
Called: Before conversation is deletedParameters:
  • conversation_id (str): ID of conversation being deleted
Use cases:
  • Delete cached data
  • Clean up temporary files
  • Remove conversation from databases
  • Free memory
def on_conversation_deleted(self, conversation_id: str):
    self.log(LogLevel.INFO, "Conversation deleted", id=conversation_id)

    # Remove from state store
    if conversation_id in self.conversation_states:
        del self.conversation_states[conversation_id]

    # Delete temp files
    temp_dir = f"/tmp/opperator/agent_{conversation_id}"
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)

    # Remove from database
    self.db.delete_conversation(conversation_id)

Agent Activation Events

Called: When this agent becomes the active agentParameters:
  • previous_agent (str | None): Previously active agent name
  • conversation_id (str): Current conversation ID
Use cases:
  • Update UI to show active state
  • Resume background tasks
  • Initialize agent-specific context
  • Update system prompt with current state
def on_agent_activated(
    self,
    previous_agent: Optional[str],
    conversation_id: str
):
    self.log(
        LogLevel.INFO,
        "Agent activated",
        previous=previous_agent,
        conversation=conversation_id
    )

    # Update status
    self.active = True

    # Update sidebar
    self.update_section(
        "status",
        f"""
<c fg="green">● Active</c>
<b>Previous:</b> {previous_agent or 'None'}
<b>Conversation:</b> {conversation_id[:8]}...
        """.strip()
    )

    # Update system prompt
    self.set_system_prompt(f"""
Monitoring agent (ACTIVE).

Watched files: {len(self.watched_files)}
Active alerts: {len(self.alerts)}
    """)

    # Resume monitoring if paused
    if self.paused:
        self.resume_monitoring()
Called: When user switches away from this agentParameters:
  • next_agent (str | None): Agent being switched to
Use cases:
  • Update UI to show inactive state
  • Optionally pause background tasks
  • Save state
  • Reduce resource usage
def on_agent_deactivated(self, next_agent: Optional[str]):
    self.log(LogLevel.INFO, "Agent deactivated", next=next_agent)

    # Update status
    self.active = False

    # Update sidebar
    self.update_section(
        "status",
        f"""
<c fg="yellow">○ Inactive</c>
<b>Switched to:</b> {next_agent or 'None'}
        """.strip()
    )

    # Update system prompt (minimal when inactive)
    self.set_system_prompt("Monitoring agent (inactive).")

    # Optionally pause expensive background tasks
    # (keep running if you want to keep collecting data)
    if self.pause_when_inactive:
        self.pause_monitoring()
Agents keep running when inactive. Only pause tasks if needed to reduce resource usage.

Invocation Directory Events

Called: When the invocation directory changes (where the user ran op from)Parameters:
  • old_path (str): Previous invocation directory
  • new_path (str): New invocation directory
Use cases:
  • Update file path resolution context
  • Refresh file watchers
  • Update relative path handling
  • Notify user of context change
How it works:
  1. When the TUI starts, the daemon stores where the user ran op from
  2. Agents can query this via get_invocation_directory()
  3. When the invocation directory changes, agents receive this lifecycle event
  4. Agents can use this to adjust their working context
def on_invocation_directory_changed(self, old_path: str, new_path: str):
    self.log(
        LogLevel.INFO,
        "Invocation directory changed",
        old=old_path,
        new=new_path
    )

    # Update file watcher context
    if hasattr(self, 'watcher'):
        self.watcher.update_base_path(new_path)

    # Refresh cached file paths
    self.refresh_file_cache(new_path)

    # Update system prompt with new context
    self.set_system_prompt(f"""
File monitoring agent.

Current directory: {new_path}
Watched files: {len(self.watched_files)}
    """)

    # Update sidebar
    self.update_section(
        "context",
        f"""
<b>Invocation Dir:</b> {os.path.basename(new_path)}
<b>Working Dir:</b> {os.path.basename(self.get_working_directory())}
        """.strip()
    )
The invocation directory is where the user ran the op command, while the working directory is where the agent process is running. These are often different.
Access methods:
def start(self):
    # Query on startup
    invocation_dir = self.get_invocation_directory()
    if invocation_dir:
        self.log(LogLevel.INFO, "User invoked from", path=invocation_dir)
    else:
        self.log(LogLevel.WARNING, "No invocation directory set")

    # Compare with working directory
    working_dir = self.get_working_directory()
    self.log(LogLevel.INFO, "Agent running in", path=working_dir)

Signal-Based Events

These events are triggered by OS signals that you must manually send to the agent process. They are useful for operational tasks like configuration reloading and health checks.
Signal-based events only work on Unix/Linux systems. They use the Python signal handling mechanism, not the protocol event system.
Called: When SIGHUP signal receivedTrigger: kill -HUP <pid> (where <pid> is your agent’s process ID)Parameters:
  • config (dict): New configuration dictionary loaded from agents.yaml
Use cases:
  • Update runtime parameters without restart
  • Reload connection settings
  • Adjust logging levels
  • Apply new feature flags
How it works:
  1. You edit your agents.yaml file
  2. Send SIGHUP signal: kill -HUP <agent_pid>
  3. Agent reloads config from file
  4. If config changed, calls on_config_update() with new values
def on_config_update(self, config: Dict[str, Any]):
    self.log(LogLevel.INFO, "Configuration reloaded", config=config)

    # Update polling interval
    new_interval = config.get("poll_interval", 60)
    if new_interval != self.poll_interval:
        old_interval = self.poll_interval
        self.poll_interval = new_interval
        self.log(
            LogLevel.INFO,
            "Poll interval updated",
            old=old_interval,
            new=new_interval
        )

    # Update API endpoint
    new_endpoint = config.get("api_endpoint")
    if new_endpoint and new_endpoint != self.api_endpoint:
        self.api_endpoint = new_endpoint
        self.reconnect_to_api()
        self.log(LogLevel.INFO, "API endpoint updated")

    # Update feature flags
    self.feature_flags = config.get("features", {})
Hot reload lets you update agent behavior without restart. Find your agent’s PID with ps aux | grep python.
Called: When SIGUSR1 signal receivedTrigger: kill -USR1 <pid> (where <pid> is your agent’s process ID)Use cases:
  • Log current state for debugging
  • Report health metrics
  • Dump diagnostic information
  • Check resource usage
How it works:
  1. Find your agent’s process ID: ps aux | grep python
  2. Send SIGUSR1 signal: kill -USR1 <agent_pid>
  3. Agent logs current status via on_status()
def on_status(self):
    # Gather metrics
    metrics = {
        "uptime": time.time() - self.start_time,
        "requests_processed": self.request_count,
        "errors": self.error_count,
        "active_connections": len(self.connections),
        "queue_size": len(self.queue),
        "memory_mb": self.get_memory_usage_mb()
    }

    # Log comprehensive status
    self.log(
        LogLevel.INFO,
        "Health check",
        **metrics
    )

    # Check for issues and warn
    if metrics["queue_size"] > 1000:
        self.log(LogLevel.WARNING, "Queue size high",
                 size=metrics["queue_size"])

    if metrics["error_count"] > 100:
        self.log(LogLevel.WARNING, "High error count",
                 count=metrics["error_count"])
View the logged status in the Opperator TUI logs panel or in your agent’s log file.

Complete Example

Agent demonstrating all lifecycle events:
from opperator_agent import OpperatorAgent, LogLevel
import time
import os
from typing import Dict, Any, Optional

class LifecycleAgent(OpperatorAgent):
    def __init__(self):
        super().__init__(name="lifecycle_agent")
        self.start_time = None
        self.active = False
        self.conversation_states = {}
        self.current_conversation = None

    def initialize(self):
        self.set_description("Demonstrates lifecycle event handling")

        self.register_section(
            "status",
            "Agent Status",
            '<c fg="yellow">○ Initializing</c>'
        )

        self.register_section(
            "context",
            "Context",
            "No conversation",
            collapsed=True
        )

    def start(self):
        self.start_time = time.time()
        self.update_section("status", '<c fg="green">● Running</c>')

    # Conversation events
    def on_new_conversation(self, conversation_id: str, is_clear: bool):
        source = "Cleared" if is_clear else "New"
        self.current_conversation = conversation_id
        self.conversation_states[conversation_id] = {
            "created": time.time(),
            "message_count": 0
        }

        self.update_section(
            "context",
            f"""
<b>Event:</b> {source} conversation
<b>ID:</b> {conversation_id[:8]}...
            """.strip()
        )

    def on_conversation_switched(
        self,
        conversation_id: str,
        previous_id: str,
        message_count: int
    ):
        self.current_conversation = conversation_id

        self.update_section(
            "context",
            f"""
<b>Event:</b> Switched conversation
<b>From:</b> {previous_id[:8] if previous_id else 'None'}
<b>To:</b> {conversation_id[:8]}...
<b>Messages:</b> {message_count}
            """.strip()
        )

    def on_conversation_deleted(self, conversation_id: str):
        if conversation_id in self.conversation_states:
            del self.conversation_states[conversation_id]

        self.log(LogLevel.INFO, "Conversation deleted", id=conversation_id)

    # Agent activation events
    def on_agent_activated(
        self,
        previous_agent: Optional[str],
        conversation_id: str
    ):
        self.active = True

        self.update_section(
            "status",
            f"""
<c fg="green">● Active</c>
<b>Previous:</b> {previous_agent or 'None'}
            """.strip()
        )

        self.set_system_prompt(f"""
Lifecycle agent is ACTIVE.

Conversation: {conversation_id}
Active conversations: {len(self.conversation_states)}
        """)

    def on_agent_deactivated(self, next_agent: Optional[str]):
        self.active = False

        self.update_section(
            "status",
            f"""
<c fg="yellow">○ Inactive</c>
<b>Next:</b> {next_agent or 'None'}
            """.strip()
        )

        self.set_system_prompt("Lifecycle agent (inactive).")

    # Invocation directory events
    def on_invocation_directory_changed(self, old_path: str, new_path: str):
        self.log(
            LogLevel.INFO,
            "Invocation directory changed",
            old=os.path.basename(old_path),
            new=os.path.basename(new_path)
        )

        self.update_section(
            "context",
            f"""
<b>Event:</b> Invocation directory changed
<b>Old:</b> {os.path.basename(old_path)}
<b>New:</b> {os.path.basename(new_path)}
            """.strip()
        )

    def on_config_update(self, config: Dict[str, Any]):
        self.log(LogLevel.INFO, "Config updated", config=config)

    def on_status(self):
        uptime = time.time() - self.start_time if self.start_time else 0

        self.log(
            LogLevel.INFO,
            "Status check",
            uptime_seconds=int(uptime),
            active=self.active,
            conversations=len(self.conversation_states),
            current_conversation=self.current_conversation
        )

Event Flow Examples

Event Flow:
  1. User action in TUI
  2. Daemon sends protocol event
  3. on_new_conversation(conv_id, is_clear=False) called
  4. Agent resets conversation state
Event Flow:
  1. User types /clear in TUI
  2. Daemon sends protocol event
  3. on_new_conversation(conv_id, is_clear=True) called
  4. Agent clears conversation data
Event Flow:
  1. User switches conversation in TUI
  2. Daemon sends protocol event
  3. on_conversation_switched(new_id, old_id, msg_count) called
  4. Agent loads new conversation state
Event Flow:
  1. User switches to different agent
  2. Current agent: on_agent_deactivated(next_agent="other_agent")
  3. Other agent: on_agent_activated(previous_agent="current_agent", conv_id)
Event Flow:
  1. Developer edits agents.yaml
  2. Sends signal: kill -HUP <agent_pid>
  3. Agent reloads config file
  4. If changed: on_config_update(new_config) called
  5. Agent applies new settings
Event Flow:
  1. Developer sends signal: kill -USR1 <agent_pid>
  2. on_status() called
  3. Agent logs metrics to TUI/logs
  4. Developer reviews status output

Best Practices

Don’t block event handlersEvent handlers should complete quickly (< 100ms). Avoid long-running operations.
# ❌ Bad: Long operation in event handler
def on_agent_activated(self, previous_agent, conversation_id):
    self.process_all_files()  # Could take minutes!
    self.train_model()  # Blocks other events

# ✅ Good: Quick update only
def on_agent_activated(self, previous_agent, conversation_id):
    self.active = True
    self.update_section("status", "Active")
    # Defer heavy work to background thread or main_loop
Handle missing data gracefullyProtocol event parameters like previous_id may be empty strings or None.
def on_conversation_switched(
    self,
    conversation_id: str,
    previous_id: str,
    message_count: int
):
    # previous_id might be empty string
    if previous_id:
        self.save_state(previous_id)

    # Conversation might be new to this agent
    state = self.load_state(conversation_id)
    if state:
        self.restore_state(state)
    else:
        self.initialize_state()
Log all lifecycle eventsAlways log lifecycle events for debugging, monitoring, and audit trails.
# Protocol events
def on_new_conversation(self, conversation_id: str, is_clear: bool):
    self.log(
        LogLevel.INFO,
        "New conversation",
        id=conversation_id,
        is_clear=is_clear
    )

# Signal events
def on_config_update(self, config: Dict[str, Any]):
    self.log(
        LogLevel.INFO,
        "Config reloaded",
        changed_keys=list(config.keys())
    )
Signal events are Unix/Linux onlyon_config_update() and on_status() use POSIX signals. They won’t work on Windows.
def on_config_update(self, config: Dict[str, Any]):
    # This will only be called on Unix/Linux systems
    # Windows users won't be able to trigger this event
    self.apply_config(config)
If you need cross-platform config reloading, implement file watching in your main_loop() instead.

Next Steps