""" Django adapter for AMAR. Queries AMAR's PostgreSQL database directly. """ from typing import Dict, List, Any, Optional from sqlalchemy import create_engine, text from . import BaseAdapter class DjangoAdapter(BaseAdapter): """Adapter for Django/AMAR.""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.engine = self._create_engine() def _create_engine(self): """Create SQLAlchemy engine from config.""" db_url = ( f"postgresql://{self.config['user']}:{self.config['password']}" f"@{self.config['host']}:{self.config['port']}/{self.config['name']}" ) return create_engine(db_url, pool_pre_ping=True) def _execute(self, sql: str) -> List[Dict[str, Any]]: """Execute SQL and return results as list of dicts.""" with self.engine.connect() as conn: result = conn.execute(text(sql)) rows = result.fetchall() columns = result.keys() return [dict(zip(columns, row)) for row in rows] def get_queries(self) -> List[str]: """Available predefined queries.""" return [ "user_with_pets", "user_with_requests", ] def navigate( self, query: Optional[str] = None, entity: Optional[str] = None, id: Optional[int] = None ) -> Dict[str, Any]: """Navigate data graph.""" if query: return self._query_mode(query) elif entity and id: return self._entity_mode(entity, id) else: raise ValueError("Must provide either query or entity+id") def _query_mode(self, query_name: str) -> Dict[str, Any]: """Execute predefined query.""" if query_name == "user_with_pets": sql = """ SELECT u.id as user_id, u.username, u.email, po.id as petowner_id, po.first_name, po.last_name, po.phone, p.id as pet_id, p.name as pet_name, p.pet_type, p.age FROM auth_user u JOIN mascotas_petowner po ON po.user_id = u.id JOIN mascotas_pet p ON p.owner_id = po.id WHERE p.deleted = false LIMIT 1 """ elif query_name == "user_with_requests": sql = """ SELECT u.id as user_id, u.username, u.email, po.id as petowner_id, po.first_name, po.last_name, sr.id as request_id, sr.state, sr.created_at FROM auth_user u JOIN mascotas_petowner po ON po.user_id = u.id JOIN solicitudes_servicerequest sr ON sr.petowner_id = po.id WHERE sr.deleted = false ORDER BY sr.created_at DESC LIMIT 1 """ else: raise ValueError(f"Unknown query: {query_name}") rows = self._execute(sql) if not rows: return self._empty_response() return self._rows_to_graph(rows[0]) def _entity_mode(self, entity: str, id: int) -> Dict[str, Any]: """Navigate to specific entity.""" if entity == "User": sql = f""" SELECT u.id as user_id, u.username, u.email, po.id as petowner_id, po.first_name, po.last_name, po.phone FROM auth_user u LEFT JOIN mascotas_petowner po ON po.user_id = u.id WHERE u.id = {id} """ else: raise ValueError(f"Unknown entity: {entity}") rows = self._execute(sql) if not rows: return self._empty_response() return self._rows_to_graph(rows[0]) def _rows_to_graph(self, row: Dict[str, Any]) -> Dict[str, Any]: """Convert SQL row to graph structure.""" nodes = [] edges = [] # User node if "user_id" in row and row["user_id"]: nodes.append({ "id": f"User_{row['user_id']}", "type": "User", "label": row.get("username") or row.get("email", ""), "data": { "id": row["user_id"], "username": row.get("username"), "email": row.get("email"), } }) # PetOwner node if "petowner_id" in row and row["petowner_id"]: name = f"{row.get('first_name', '')} {row.get('last_name', '')}".strip() nodes.append({ "id": f"PetOwner_{row['petowner_id']}", "type": "PetOwner", "label": name or "PetOwner", "data": { "id": row["petowner_id"], "first_name": row.get("first_name"), "last_name": row.get("last_name"), "phone": row.get("phone"), } }) if "user_id" in row and row["user_id"]: edges.append({ "from": f"User_{row['user_id']}", "to": f"PetOwner_{row['petowner_id']}", "label": "has profile" }) # Pet node if "pet_id" in row and row["pet_id"]: nodes.append({ "id": f"Pet_{row['pet_id']}", "type": "Pet", "label": row.get("pet_name", "Pet"), "data": { "id": row["pet_id"], "name": row.get("pet_name"), "pet_type": row.get("pet_type"), "age": row.get("age"), } }) if "petowner_id" in row and row["petowner_id"]: edges.append({ "from": f"PetOwner_{row['petowner_id']}", "to": f"Pet_{row['pet_id']}", "label": "owns" }) # ServiceRequest node if "request_id" in row and row["request_id"]: nodes.append({ "id": f"ServiceRequest_{row['request_id']}", "type": "ServiceRequest", "label": f"Request #{row['request_id']}", "data": { "id": row["request_id"], "state": row.get("state"), "created_at": str(row.get("created_at", "")), } }) if "petowner_id" in row and row["petowner_id"]: edges.append({ "from": f"PetOwner_{row['petowner_id']}", "to": f"ServiceRequest_{row['request_id']}", "label": "requested" }) # Build summary from first User node summary = self._build_summary(nodes) return { "nodes": nodes, "edges": edges, "summary": summary } def _build_summary(self, nodes: List[Dict]) -> Dict[str, Any]: """Build summary from nodes.""" # Find User node user_node = next((n for n in nodes if n["type"] == "User"), None) if user_node: data = user_node["data"] return { "title": f"User #{data['id']}", "credentials": f"{data.get('username', 'N/A')} | Password: Amar2025!", "fields": { "Email": data.get("email", "N/A"), "Username": data.get("username", "N/A"), } } # Fallback return { "title": "No data", "credentials": None, "fields": {} } def _empty_response(self) -> Dict[str, Any]: """Return empty response structure.""" return { "nodes": [], "edges": [], "summary": { "title": "No data found", "credentials": None, "fields": {} } }