Completing agentic tasks
from opperai import Opper
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import os
import json
import time
import sys
opper = Opper(http_bearer=os.getenv("OPPER_API_KEY", ""))
class ActionOutput(BaseModel):
thoughts: str = Field(description="Analysis of the current situation and next steps")
selected_action: str = Field(description="The chosen action from available tools")
action_parameters: Dict[str, Any] = Field(description="Parameters for the action")
expected_outcome: str = Field(description="What outcome is expected from this action")
class ConsoleAgent:
def __init__(self):
self.available_tools = [
{
"tool": "create_folder",
"description": "Create a new directory/folder",
"parameters": ["path"]
},
{
"tool": "create_file",
"description": "Create a new file with content",
"parameters": ["path", "content"]
},
{
"tool": "edit_file",
"description": "Edit an existing file by replacing its content",
"parameters": ["path", "content"]
},
{
"tool": "list_files",
"description": "List files and directories in a path. Use this to verify the current state of the file system.",
"parameters": ["path"]
}
]
self.trajectory = [] # Track all actions and their results
def add_to_trajectory(self, action: str, parameters: Dict[str, Any], result: str):
"""Add an action and its result to the trajectory"""
self.trajectory.append({
"action": action,
"parameters": parameters,
"result": result,
"timestamp": time.time()
})
def get_trajectory_summary(self) -> str:
"""Get a summary of the trajectory for the AI"""
if not self.trajectory:
return "No actions performed yet."
summary = "Previous actions:\n"
for i, step in enumerate(self.trajectory, 1):
summary += f"{i}. {step['action']}({step['parameters']}) -> {step['result']}\n"
return summary
def create_folder(self, path: str) -> str:
"""Create a new folder at the specified path"""
try:
os.makedirs(path, exist_ok=True)
return f"Successfully created folder: {path}"
except Exception as e:
return f"Error creating folder {path}: {str(e)}"
def create_file(self, path: str, content: str) -> str:
"""Create a new file with the specified content"""
try:
# Ensure the directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(content)
return f"Successfully created file: {path}"
except Exception as e:
return f"Error creating file {path}: {str(e)}"
def edit_file(self, path: str, content: str) -> str:
"""Edit an existing file by replacing its content"""
try:
with open(path, 'w') as f:
f.write(content)
return f"Successfully edited file: {path}"
except Exception as e:
return f"Error editing file {path}: {str(e)}"
def list_files(self, path: str = ".") -> str:
"""List files and directories in the specified path"""
try:
items = os.listdir(path)
result = f"Contents of {path}:\n"
for item in items:
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
result += f"DIR {item}/\n"
else:
result += f"FILE {item}\n"
return result
except Exception as e:
return f"Error listing files in {path}: {str(e)}"
def execute_action(self, action: str, parameters: Dict[str, Any]) -> str:
"""Execute the specified action with parameters"""
if action == "create_folder":
return self.create_folder(parameters["path"])
elif action == "create_file":
return self.create_file(parameters["path"], parameters["content"])
elif action == "edit_file":
return self.edit_file(parameters["path"], parameters["content"])
elif action == "list_files":
return self.list_files(parameters.get("path", "."))
elif action == "done":
return "Task completed"
else:
return f"Unknown action: {action}"
def plan_and_execute(self, task: str) -> None:
"""Plan and execute a high-level task by looping until completion"""
print(f"Task: {task}")
print("=" * 50)
# Reset trajectory for new task
self.trajectory = []
max_iterations = 10 # Prevent infinite loops
iteration = 0
task_completed = False
while not task_completed and iteration < max_iterations:
iteration += 1
print(f"\nIteration {iteration}")
# Get trajectory summary
trajectory_summary = self.get_trajectory_summary()
print(f"Trajectory: {trajectory_summary}")
# Get the next action from the AI
response = opper.call(
name="next_action",
instructions="Choose the best next action to accomplish the given task. Consider the current state, available tools, and what has already been done. If the task is complete, set selected_action to 'done'. Always use the list_files tool to verify the current state of the file system.",
input={
"task": task,
"available_tools": self.available_tools,
"trajectory": trajectory_summary
},
output_schema=ActionOutput
)
action_plan = response.json_payload
print(f"Thoughts: {action_plan['thoughts']}")
print(f"Action: {action_plan['selected_action']}")
print(f"Parameters: {action_plan['action_parameters']}")
print(f"Expected: {action_plan['expected_outcome']}")
# Check if task is complete
if action_plan['selected_action'].lower() == 'done':
print("\nTask completed!")
task_completed = True
break
# Execute the action
print("\nExecuting action...")
result = self.execute_action(action_plan['selected_action'], action_plan['action_parameters'])
print(f"Result: {result}")
# Add to trajectory
self.add_to_trajectory(action_plan['selected_action'], action_plan['action_parameters'], result)
# Small delay to make the output readable
time.sleep(0.5)
if not task_completed:
print(f"\nTask not completed after {max_iterations} iterations")
print("=" * 50)
def main():
agent = ConsoleAgent()
# Check if task is provided as command line argument
if len(sys.argv) < 2:
print("Simple Console Agent")
print("Usage: python simple_console_agent.py \"<task>\"")
print("Example: python simple_console_agent.py \"Create a folder called 'my_project'\"")
sys.exit(1)
# Get task from command line argument
task = " ".join(sys.argv[1:])
print(f"Simple Console Agent")
print(f"Task: {task}")
print()
# Execute the task
agent.plan_and_execute(task)
if __name__ == "__main__":
main()
# Example Output:
# Simple Console Agent
#
# Task: Create a Python project structure
# ==================================================
#
# Iteration 1
# Trajectory: No actions performed yet.
# Thoughts: I need to create a basic Python project structure with src folder and main.py
# Action: create_folder
# Parameters: {'path': 'src'}
# Expected: A src folder will be created for the Python project
#
# Executing action...
# Result: Successfully created folder: src
#
# Iteration 2
# Trajectory: Previous actions:
# 1. create_folder({'path': 'src'}) -> Successfully created folder: src
# Thoughts: Now I need to create the main.py file in the src folder
# Action: create_file
# Parameters: {'path': 'src/main.py', 'content': '# Main Python file\n\nif __name__ == "__main__":\n print("Hello, World!")'}
# Expected: A main.py file will be created with basic Python code
#
# Executing action...
# Result: Successfully created file: src/main.py
#
# Iteration 3
# Trajectory: Previous actions:
# 1. create_folder({'path': 'src'}) -> Successfully created folder: src
# 2. create_file({'path': 'src/main.py', 'content': '# Main Python file\n\nif __name__ == "__main__":\n print("Hello, World!")'}) -> Successfully created file: src/main.py
# Thoughts: The Python project structure is complete with src folder and main.py file
# Action: done
# Parameters: {}
# Expected: Task completion
#
# Task completed!
# ==================================================