from opperai import Opper
import requests
import os
import time
opper = Opper(http_bearer="YOUR_API_KEY")
# First, create a knowledge base
unique_name = f"document_library_{int(time.time())}"
knowledge_base = opper.knowledge.create_base(
name=unique_name, embedding_model="azure/text-embedding-3-large"
)
print(f"Created knowledge base: {knowledge_base.name}")
print(f"Knowledge base ID: {knowledge_base.id}")
# Create a temporary text file to upload to knowledge base
filename = "sample_policy.txt"
sample_content = """
Employee Code of Conduct
1. Professional Behavior
All employees are expected to maintain professional conduct at all times.
2. Confidentiality
Employees must maintain confidentiality of company information and client data.
3. Dress Code
Business casual attire is required for office work.
4. Communication
Use professional communication channels for all business-related correspondence.
"""
with open(filename, "w") as f:
f.write(sample_content)
print(f"Created sample file: {filename}")
try:
# Step 1: Get upload URL
upload_info = opper.knowledge.get_upload_url(
index_id=knowledge_base.id,
filename=filename
)
print(f"Got upload URL for: {filename}")
print(f"File ID: {upload_info['id']}")
# Step 2: Upload file to the provided URL
with open(filename, 'rb') as f:
files = {'file': f}
data = upload_info['fields']
upload_response = requests.post(upload_info['url'], data=data, files=files)
upload_response.raise_for_status()
print(f"Upload status: {upload_response.status_code}")
# Step 3: Register the uploaded file
registered_file = opper.knowledge.register_file(
index_id=knowledge_base.id,
file_id=upload_info['id'],
filename=filename,
content_type="text/plain"
)
print(f"Registered file: {registered_file['original_filename']}")
print(f"Document ID: {registered_file['document_id']}")
finally:
# Clean up the temporary file
if os.path.exists(filename):
os.remove(filename)
print(f"Cleaned up temporary file: {filename}")