add health check, run TTL cleanup, structured logging, proper HTTP status codes, Pydantic Settings

This commit is contained in:
2026-04-14 11:18:05 -03:00
parent 0f122fa8f7
commit 318e8ab03e
2 changed files with 106 additions and 24 deletions

28
api/config.py Normal file
View File

@@ -0,0 +1,28 @@
"""Centralized configuration via Pydantic Settings."""
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
llm_provider: str = "groq"
groq_api_key: str = ""
groq_model: str = "llama-3.3-70b-versatile"
anthropic_api_key: str = ""
anthropic_model: str = "claude-sonnet-4-20250514"
openai_api_key: str = ""
openai_base_url: str = "https://api.openai.com/v1"
openai_model: str = "gpt-4o"
aws_access_key_id: str = ""
aws_secret_access_key: str = ""
aws_default_region: str = "us-east-1"
bedrock_model_id: str = "anthropic.claude-sonnet-4-20250514-v1:0"
kong_proxy_url: str = ""
model_config = {"env_prefix": "", "case_sensitive": False}
@lru_cache
def get_settings() -> Settings:
return Settings()

View File

@@ -2,11 +2,12 @@
import asyncio import asyncio
import json import json
import logging
import uuid import uuid
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timezone from datetime import datetime, timezone
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
@@ -15,6 +16,12 @@ from agents.handover import run_handover
from agents.shared.mcp_client import connect_servers from agents.shared.mcp_client import connect_servers
from mcp_servers.data.scenarios.manager import scenario_manager from mcp_servers.data.scenarios.manager import scenario_manager
logger = logging.getLogger("nova")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# ── WebSocket event hub ── # ── WebSocket event hub ──
@@ -38,6 +45,8 @@ class EventHub:
await ws.send_json(event) await ws.send_json(event)
except Exception: except Exception:
dead.add(ws) dead.add(ws)
if dead:
logger.info("ws_cleanup removed=%d remaining=%d", len(dead), len(self._clients) - len(dead))
self._clients -= dead self._clients -= dead
@@ -47,12 +56,31 @@ event_hub = EventHub()
runs: dict[str, dict] = {} runs: dict[str, dict] = {}
RUN_TTL_SECONDS = 3600
RUN_CLEANUP_INTERVAL = 300
# ── App lifecycle ── # ── App lifecycle ──
async def _cleanup_runs():
"""Prune completed/errored runs older than RUN_TTL_SECONDS."""
while True:
await asyncio.sleep(RUN_CLEANUP_INTERVAL)
now = datetime.now(timezone.utc).timestamp()
expired = [
rid for rid, r in runs.items()
if r["status"] in ("completed", "error")
and now - r.get("created_at", now) > RUN_TTL_SECONDS
]
for rid in expired:
del runs[rid]
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
task = asyncio.create_task(_cleanup_runs())
yield yield
task.cancel()
app = FastAPI(title="United Ops MCP Demo", lifespan=lifespan) app = FastAPI(title="United Ops MCP Demo", lifespan=lifespan)
@@ -64,6 +92,17 @@ app.add_middleware(
) )
# ── Health check ──
@app.get("/health")
async def health():
return {
"status": "ok",
"scenario": scenario_manager.active_id,
"runs_in_memory": len(runs),
}
# ── Request/Response models ── # ── Request/Response models ──
class FCERequest(BaseModel): class FCERequest(BaseModel):
@@ -81,13 +120,17 @@ class ScenarioUpdate(BaseModel):
@app.post("/agents/fce") @app.post("/agents/fce")
async def trigger_fce(req: FCERequest): async def trigger_fce(req: FCERequest):
run_id = str(uuid.uuid4())[:8] run_id = str(uuid.uuid4())[:8]
runs[run_id] = {"status": "running", "agent": "fce", "flight_id": req.flight_id} now = datetime.now(timezone.utc)
runs[run_id] = {
"status": "running", "agent": "fce",
"flight_id": req.flight_id, "created_at": now.timestamp(),
}
async def _run(): async def _run():
await event_hub.broadcast({ await event_hub.broadcast({
"type": "agent_start", "run_id": run_id, "type": "agent_start", "run_id": run_id,
"agent": "fce", "flight_id": req.flight_id, "agent": "fce", "flight_id": req.flight_id,
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": now.isoformat(),
}) })
async def on_event(event): async def on_event(event):
@@ -97,9 +140,12 @@ async def trigger_fce(req: FCERequest):
async with connect_servers(["shared", "ops", "passenger"]) as mcp: async with connect_servers(["shared", "ops", "passenger"]) as mcp:
result = await run_fce(req.flight_id, mcp, on_event=on_event) result = await run_fce(req.flight_id, mcp, on_event=on_event)
runs[run_id] = {"status": "completed", "agent": "fce", "result": result} runs[run_id] = {"status": "completed", "agent": "fce", "result": result}
logger.info("agent_complete agent=fce run_id=%s flight=%s", run_id, req.flight_id)
except Exception as e: except Exception as e:
runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)} runs[run_id] = {"status": "error", "agent": "fce", "error": str(e)}
logger.error("agent_error agent=fce run_id=%s error=%s", run_id, e)
logger.info("agent_start agent=fce run_id=%s flight=%s", run_id, req.flight_id)
asyncio.create_task(_run()) asyncio.create_task(_run())
return {"run_id": run_id, "status": "running"} return {"run_id": run_id, "status": "running"}
@@ -107,13 +153,17 @@ async def trigger_fce(req: FCERequest):
@app.post("/agents/handover") @app.post("/agents/handover")
async def trigger_handover(req: HandoverRequest): async def trigger_handover(req: HandoverRequest):
run_id = str(uuid.uuid4())[:8] run_id = str(uuid.uuid4())[:8]
runs[run_id] = {"status": "running", "agent": "handover", "hubs": req.hubs} now = datetime.now(timezone.utc)
runs[run_id] = {
"status": "running", "agent": "handover",
"hubs": req.hubs, "created_at": now.timestamp(),
}
async def _run(): async def _run():
await event_hub.broadcast({ await event_hub.broadcast({
"type": "agent_start", "run_id": run_id, "type": "agent_start", "run_id": run_id,
"agent": "handover", "hubs": req.hubs, "agent": "handover", "hubs": req.hubs,
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": now.isoformat(),
}) })
async def on_event(event): async def on_event(event):
@@ -123,9 +173,12 @@ async def trigger_handover(req: HandoverRequest):
async with connect_servers(["shared", "ops"]) as mcp: async with connect_servers(["shared", "ops"]) as mcp:
result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event) result = await run_handover(hubs=req.hubs, mcp=mcp, on_event=on_event)
runs[run_id] = {"status": "completed", "agent": "handover", "result": result} runs[run_id] = {"status": "completed", "agent": "handover", "result": result}
logger.info("agent_complete agent=handover run_id=%s hubs=%s", run_id, req.hubs)
except Exception as e: except Exception as e:
runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)} runs[run_id] = {"status": "error", "agent": "handover", "error": str(e)}
logger.error("agent_error agent=handover run_id=%s error=%s", run_id, e)
logger.info("agent_start agent=handover run_id=%s hubs=%s", run_id, req.hubs)
asyncio.create_task(_run()) asyncio.create_task(_run())
return {"run_id": run_id, "status": "running"} return {"run_id": run_id, "status": "running"}
@@ -133,7 +186,7 @@ async def trigger_handover(req: HandoverRequest):
@app.get("/agents/runs/{run_id}") @app.get("/agents/runs/{run_id}")
async def get_run(run_id: str): async def get_run(run_id: str):
if run_id not in runs: if run_id not in runs:
return {"error": f"Run {run_id} not found"} raise HTTPException(404, detail=f"Run {run_id} not found")
return runs[run_id] return runs[run_id]
@@ -160,9 +213,11 @@ async def get_active_scenario():
@app.put("/scenarios/active") @app.put("/scenarios/active")
async def set_active_scenario(req: ScenarioUpdate): async def set_active_scenario(req: ScenarioUpdate):
try: try:
return scenario_manager.set_active(req.scenario_id) result = scenario_manager.set_active(req.scenario_id)
logger.info("scenario_switch scenario=%s", req.scenario_id)
return result
except ValueError as e: except ValueError as e:
return {"error": str(e)} raise HTTPException(400, detail=str(e))
# ── LLM config routes ── # ── LLM config routes ──
@@ -170,27 +225,27 @@ async def set_active_scenario(req: ScenarioUpdate):
@app.get("/config/llm") @app.get("/config/llm")
async def get_llm_config(): async def get_llm_config():
"""Current LLM provider configuration.""" """Current LLM provider configuration."""
import os from api.config import get_settings
provider = os.getenv("LLM_PROVIDER", "groq") s = get_settings()
return { return {
"provider": provider, "provider": s.llm_provider,
"providers": { "providers": {
"groq": { "groq": {
"configured": bool(os.getenv("GROQ_API_KEY")), "configured": bool(s.groq_api_key),
"model": os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile"), "model": s.groq_model,
}, },
"anthropic": { "anthropic": {
"configured": bool(os.getenv("ANTHROPIC_API_KEY")), "configured": bool(s.anthropic_api_key),
"model": os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-20250514"), "model": s.anthropic_model,
}, },
"bedrock": { "bedrock": {
"configured": bool(os.getenv("AWS_ACCESS_KEY_ID")), "configured": bool(s.aws_access_key_id),
"model": os.getenv("BEDROCK_MODEL_ID", "anthropic.claude-sonnet-4-20250514-v1:0"), "model": s.bedrock_model_id,
}, },
"openai": { "openai": {
"configured": bool(os.getenv("OPENAI_API_KEY")), "configured": bool(s.openai_api_key),
"model": os.getenv("OPENAI_MODEL", "gpt-4o"), "model": s.openai_model,
"base_url": os.getenv("OPENAI_BASE_URL", ""), "base_url": s.openai_base_url,
}, },
"template": { "template": {
"configured": True, "configured": True,
@@ -232,8 +287,7 @@ async def set_llm_config(req: LLMConfigUpdate):
if req.model: if req.model:
os.environ["BEDROCK_MODEL_ID"] = req.model os.environ["BEDROCK_MODEL_ID"] = req.model
# No need to update server configs — _server_config() reads env at connect time logger.info("llm_config_change provider=%s", req.provider)
return await get_llm_config() return await get_llm_config()
@@ -290,7 +344,7 @@ async def patch_flight(flight_id: str, patch: FlightPatch):
if patch.gate is not None: if patch.gate is not None:
f.gate = patch.gate f.gate = patch.gate
return f.model_dump(mode="json") return f.model_dump(mode="json")
return {"error": f"Flight {flight_id} not found"} raise HTTPException(404, detail=f"Flight {flight_id} not found")
class CrewPatch(BaseModel): class CrewPatch(BaseModel):
@@ -307,7 +361,7 @@ async def patch_crew(crew_id: str, patch: CrewPatch):
d["hours_until_limit"] = round(c.duty_hours_limit - c.duty_hours_elapsed, 2) d["hours_until_limit"] = round(c.duty_hours_limit - c.duty_hours_elapsed, 2)
d["at_risk"] = d["hours_until_limit"] <= 2.0 d["at_risk"] = d["hours_until_limit"] <= 2.0
return d return d
return {"error": f"Crew {crew_id} not found"} raise HTTPException(404, detail=f"Crew {crew_id} not found")
class CrewNotesPatch(BaseModel): class CrewNotesPatch(BaseModel):