from pydantic import BaseModel
import httpx
import asyncio
class CreateSessionResponse(BaseModel):
session_id: str
token: str
class SessionSayResponse(BaseModel):
task_id: str
class APIClient:
def __init__(self, api_key: str, base_url: str = "https://api.avaturn.live") -> None:
self.headers = {"Authorization": f"Bearer {api_key}"}
self.base_url = base_url
async def create_session(self) -> CreateSessionResponse:
async with httpx.AsyncClient() as client:
response = await client.post(f"{self.base_url}/api/v1/sessions", headers=self.headers)
return CreateSessionResponse.model_validate(response.json())
async def say(self, session_id: str, text: str) -> SessionSayResponse:
async with httpx.AsyncClient() as client:
response = await client.post(f"{self.base_url}/api/v1/sessions/{session_id}/tasks", json={"text": text}, headers=self.headers)
return SessionSayResponse.model_validate(response.json())
async def terminate_session(self, session_id: str) -> None:
async with httpx.AsyncClient() as client:
await client.delete(f"{self.base_url}/api/v1/sessions/{session_id}", headers=self.headers)
async def main() -> None:
# Step 1: Create a session
client = APIClient(api_key="your_api_key_here")
session = await client.create_session()
print(session.dict())
# You can send session token to frontend for Web SDK use here
# Step 2: Request avatar to say something (note that this code does not wait for the speech end)
say_response = await client.say(session.session_id, "Hello, world!")
print(say_response.dict())
# Step 3: Terminate the session (after some time or condition)
await asyncio.sleep(3)
await client.terminate_session(session.session_id)
print(f"Session {session.session_id} terminated.")
import asyncio
asyncio.run(main())