from typing import Optional
from pydantic import BaseModel, Field
from opperai import Opper
import time
opper = Opper(http_bearer="YOUR_API_KEY")
class QuestionInput(BaseModel):
question: str = Field(description="The physics question to explain")
context: Optional[str] = Field(
None, description="Additional context about the learner"
)
# Prepare input data
input_data = QuestionInput(
question="Explain quantum computing in simple terms",
context="I'm a beginner in physics",
)
# Stream a response using the direct stream API
unique_name = f"physics_tutor_{int(time.time())}"
stream_response = opper.stream(
name=unique_name,
instructions="You are a friendly physics tutor. Explain complex physics concepts in simple, easy-to-understand terms. Use analogies and examples that beginners can relate to.",
input_schema=QuestionInput,
input=input_data,
model="openai/gpt-4o-mini",
)
print("Streaming response:")
# The stream method returns a response object with 'result' containing the EventStream
for event in stream_response.result:
# Each event is a FunctionStreamCallStreamPostResponseBody with 'data' containing the streaming chunk
if hasattr(event, "data") and hasattr(event.data, "delta") and event.data.delta:
print(event.data.delta, end="", flush=True)
print("\nStream completed.")