diff --git a/api/config.py b/api/config.py new file mode 100644 index 0000000..f154726 --- /dev/null +++ b/api/config.py @@ -0,0 +1,28 @@ +"""Centralized configuration via Pydantic Settings.""" + +from functools import lru_cache + +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + llm_provider: str = "groq" + groq_api_key: str = "" + groq_model: str = "llama-3.3-70b-versatile" + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" + openai_api_key: str = "" + openai_base_url: str = "https://api.openai.com/v1" + openai_model: str = "gpt-4o" + aws_access_key_id: str = "" + aws_secret_access_key: str = "" + aws_default_region: str = "us-east-1" + bedrock_model_id: str = "anthropic.claude-sonnet-4-20250514-v1:0" + kong_proxy_url: str = "" + + model_config = {"env_prefix": "", "case_sensitive": False} + + +@lru_cache +def get_settings() -> Settings: + return Settings() diff --git a/api/main.py b/api/main.py index a221307..58888ac 100644 --- a/api/main.py +++ b/api/main.py @@ -2,11 +2,12 @@ import asyncio import json +import logging import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone -from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel @@ -15,6 +16,12 @@ from agents.handover import run_handover from agents.shared.mcp_client import connect_servers from mcp_servers.data.scenarios.manager import scenario_manager +logger = logging.getLogger("nova") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", +) + # ── WebSocket event hub ── @@ -38,6 +45,8 @@ class EventHub: await ws.send_json(event) except Exception: dead.add(ws) + if dead: + logger.info("ws_cleanup removed=%d remaining=%d", len(dead), len(self._clients) - len(dead)) self._clients -= dead @@ -47,12 +56,31 @@ event_hub = EventHub() runs: dict[str, dict] = {} +RUN_TTL_SECONDS = 3600 +RUN_CLEANUP_INTERVAL = 300 + # ── App lifecycle ── +async def _cleanup_runs(): + """Prune completed/errored runs older than RUN_TTL_SECONDS.""" + while True: + await asyncio.sleep(RUN_CLEANUP_INTERVAL) + now = datetime.now(timezone.utc).timestamp() + expired = [ + rid for rid, r in runs.items() + if r["status"] in ("completed", "error") + and now - r.get("created_at", now) > RUN_TTL_SECONDS + ] + for rid in expired: + del runs[rid] + + @asynccontextmanager async def lifespan(app: FastAPI): + task = asyncio.create_task(_cleanup_runs()) yield + task.cancel() app = FastAPI(title="United Ops MCP Demo", lifespan=lifespan) @@ -64,6 +92,17 @@ app.add_middleware( ) +# ── Health check ── + +@app.get("/health") +async def health(): + return { + "status": "ok", + "scenario": scenario_manager.active_id, + "runs_in_memory": len(runs), + } + + # ── Request/Response models ── class FCERequest(BaseModel): @@ -81,13 +120,17 @@ class ScenarioUpdate(BaseModel): @app.post("/agents/fce") async def trigger_fce(req: FCERequest): run_id = str(uuid.uuid4())[:8] - runs[run_id] = {"status": "running", "agent": "fce", "flight_id": req.flight_id} + now = datetime.now(timezone.utc) + runs[run_id] = { + "status": "running", "agent": "fce", + "flight_id": req.flight_id, "created_at": now.timestamp(), + } async def _run(): await event_hub.broadcast({ "type": "agent_start", "run_id": run_id, "agent": "fce", "flight_id": req.flight_id, - "timestamp": datetime.now(timezone.utc).isoformat(), + "timestamp": now.isoformat(), }) async def on_event(event): @@ -97,9 +140,12 @@ async def trigger_fce(req: FCERequest): async with connect_servers(["shared", "ops", "passenger"]) as mcp: result = await run_fce(req.flight_id, mcp, on_event=on_event) runs[run_id] = {"status": "completed", "agent": "fce", "result": result} + logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id) except Exception as e: runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)} + logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e) + logger.info("agent_start agent=fce run_id=%s flight=%s", run_id, req.flight_id) asyncio.create_task(_run()) return {"run_id": run_id, "status": "running"} @@ -107,13 +153,17 @@ async def trigger_fce(req: FCERequest): @app.post("/agents/handover") async def trigger_handover(req: HandoverRequest): run_id = str(uuid.uuid4())[:8] - runs[run_id] = {"status": "running", "agent": "handover", "hubs": req.hubs} + now = datetime.now(timezone.utc) + runs[run_id] = { + "status": "running", "agent": "handover", + "hubs": req.hubs, "created_at": now.timestamp(), + } async def _run(): await event_hub.broadcast({ "type": "agent_start", "run_id": run_id, "agent": "handover", "hubs": req.hubs, - "timestamp": datetime.now(timezone.utc).isoformat(), + "timestamp": now.isoformat(), }) async def on_event(event): @@ -123,9 +173,12 @@ async def trigger_handover(req: HandoverRequest): async with connect_servers(["shared", "ops"]) as mcp: result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event) runs[run_id] = {"status": "completed", "agent": "handover", "result": result} + logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs) except Exception as e: runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)} + logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e) + logger.info("agent_start agent=handover run_id=%s hubs=%s", run_id, req.hubs) asyncio.create_task(_run()) return {"run_id": run_id, "status": "running"} @@ -133,7 +186,7 @@ async def trigger_handover(req: HandoverRequest): @app.get("/agents/runs/{run_id}") async def get_run(run_id: str): if run_id not in runs: - return {"error": f"Run {run_id} not found"} + raise HTTPException(404, detail=f"Run {run_id} not found") return runs[run_id] @@ -160,9 +213,11 @@ async def get_active_scenario(): @app.put("/scenarios/active") async def set_active_scenario(req: ScenarioUpdate): try: - return scenario_manager.set_active(req.scenario_id) + result = scenario_manager.set_active(req.scenario_id) + logger.info("scenario_switch scenario=%s", req.scenario_id) + return result except ValueError as e: - return {"error": str(e)} + raise HTTPException(400, detail=str(e)) # ── LLM config routes ── @@ -170,27 +225,27 @@ async def set_active_scenario(req: ScenarioUpdate): @app.get("/config/llm") async def get_llm_config(): """Current LLM provider configuration.""" - import os - provider = os.getenv("LLM_PROVIDER", "groq") + from api.config import get_settings + s = get_settings() return { - "provider": provider, + "provider": s.llm_provider, "providers": { "groq": { - "configured": bool(os.getenv("GROQ_API_KEY")), - "model": os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile"), + "configured": bool(s.groq_api_key), + "model": s.groq_model, }, "anthropic": { - "configured": bool(os.getenv("ANTHROPIC_API_KEY")), - "model": os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-20250514"), + "configured": bool(s.anthropic_api_key), + "model": s.anthropic_model, }, "bedrock": { - "configured": bool(os.getenv("AWS_ACCESS_KEY_ID")), - "model": os.getenv("BEDROCK_MODEL_ID", "anthropic.claude-sonnet-4-20250514-v1:0"), + "configured": bool(s.aws_access_key_id), + "model": s.bedrock_model_id, }, "openai": { - "configured": bool(os.getenv("OPENAI_API_KEY")), - "model": os.getenv("OPENAI_MODEL", "gpt-4o"), - "base_url": os.getenv("OPENAI_BASE_URL", ""), + "configured": bool(s.openai_api_key), + "model": s.openai_model, + "base_url": s.openai_base_url, }, "template": { "configured": True, @@ -232,8 +287,7 @@ async def set_llm_config(req: LLMConfigUpdate): if req.model: os.environ["BEDROCK_MODEL_ID"] = req.model - # No need to update server configs — _server_config() reads env at connect time - + logger.info("llm_config_change provider=%s", req.provider) return await get_llm_config() @@ -290,7 +344,7 @@ async def patch_flight(flight_id: str, patch: FlightPatch): if patch.gate is not None: f.gate = patch.gate return f.model_dump(mode="json") - return {"error": f"Flight {flight_id} not found"} + raise HTTPException(404, detail=f"Flight {flight_id} not found") class CrewPatch(BaseModel): @@ -307,7 +361,7 @@ async def patch_crew(crew_id: str, patch: CrewPatch): d["hours_until_limit"] = round(c.duty_hours_limit - c.duty_hours_elapsed, 2) d["at_risk"] = d["hours_until_limit"] <= 2.0 return d - return {"error": f"Crew {crew_id} not found"} + raise HTTPException(404, detail=f"Crew {crew_id} not found") class CrewNotesPatch(BaseModel):