""" API routes for Google vein. """ from fastapi import APIRouter, HTTPException, Query from fastapi.responses import PlainTextResponse, RedirectResponse from typing import Optional from core.oauth import GoogleOAuth from core.sheets import GoogleSheetsClient, GoogleSheetsError from models.spreadsheet import SpreadsheetMetadata, SheetValues from models.formatter import format_spreadsheet_metadata, format_sheet_values # Import from parent vein module import sys from pathlib import Path vein_path = Path(__file__).parent.parent.parent sys.path.insert(0, str(vein_path)) from oauth import TokenStorage router = APIRouter() # OAuth client and token storage oauth_client = GoogleOAuth() token_storage = TokenStorage(vein_name="google") # For demo/development, use a default user_id # In production, this would come from session/auth DEFAULT_USER_ID = "demo_user" def _get_sheets_client(user_id: str = DEFAULT_USER_ID) -> GoogleSheetsClient: """Get authenticated Sheets client for user.""" tokens = token_storage.load_tokens(user_id) if not tokens: raise HTTPException( status_code=401, detail="Not authenticated. Visit /google/oauth/start to login.", ) # Check if expired and refresh if needed if token_storage.is_expired(tokens): if "refresh_token" not in tokens: raise HTTPException( status_code=401, detail="Token expired and no refresh token. Re-authenticate at /google/oauth/start", ) try: new_tokens = oauth_client.refresh_access_token(tokens["refresh_token"]) token_storage.save_tokens(user_id, new_tokens) tokens = new_tokens except Exception as e: raise HTTPException( status_code=401, detail=f"Failed to refresh token: {e}. Re-authenticate at /google/oauth/start", ) credentials = oauth_client.get_credentials( access_token=tokens["access_token"], refresh_token=tokens.get("refresh_token"), ) return GoogleSheetsClient(credentials) def _maybe_text(data, text: bool, formatter): """Return text or JSON based on query param.""" if not text: return data return PlainTextResponse(formatter(data)) @router.get("/health") async def health(): """Check if user is authenticated.""" try: tokens = token_storage.load_tokens(DEFAULT_USER_ID) if not tokens: return { "status": "not_authenticated", "message": "Visit /google/oauth/start to login", } expired = token_storage.is_expired(tokens) return { "status": "ok" if not expired else "token_expired", "has_refresh_token": "refresh_token" in tokens, "user": DEFAULT_USER_ID, } except Exception as e: raise HTTPException(500, str(e)) @router.get("/oauth/start") async def start_oauth(state: Optional[str] = None): """Start OAuth flow - redirect to Google authorization.""" auth_url = oauth_client.get_authorization_url(state=state) return RedirectResponse(auth_url) @router.get("/oauth/callback") async def oauth_callback( code: Optional[str] = None, state: Optional[str] = None, error: Optional[str] = None, ): """Handle OAuth callback from Google.""" if error: raise HTTPException(400, f"OAuth error: {error}") if not code: raise HTTPException(400, "Missing authorization code") try: tokens = oauth_client.exchange_code_for_tokens(code) token_storage.save_tokens(DEFAULT_USER_ID, tokens) return { "status": "ok", "message": "Successfully authenticated with Google", "user": DEFAULT_USER_ID, } except Exception as e: raise HTTPException(500, f"Failed to exchange code: {e}") @router.get("/oauth/logout") async def logout(): """Clear stored tokens.""" token_storage.delete_tokens(DEFAULT_USER_ID) return {"status": "ok", "message": "Logged out"} @router.get("/spreadsheets/{spreadsheet_id}") async def get_spreadsheet( spreadsheet_id: str, text: bool = False, ): """Get spreadsheet metadata (title, sheets list, etc.).""" try: client = _get_sheets_client() metadata = client.get_spreadsheet_metadata(spreadsheet_id) result = SpreadsheetMetadata.from_google(metadata) return _maybe_text(result, text, format_spreadsheet_metadata) except GoogleSheetsError as e: raise HTTPException(404, str(e)) except Exception as e: raise HTTPException(500, str(e)) @router.get("/spreadsheets/{spreadsheet_id}/values") async def get_sheet_values( spreadsheet_id: str, range: str = Query(..., description="A1 notation range (e.g., 'Sheet1!A1:D10')"), text: bool = False, max_rows: int = Query(100, ge=1, le=10000), ): """Get values from a sheet range.""" try: client = _get_sheets_client() values = client.get_sheet_values(spreadsheet_id, range) result = SheetValues.from_google(spreadsheet_id, range, values) if text: return PlainTextResponse(format_sheet_values(result, max_rows=max_rows)) return result except GoogleSheetsError as e: raise HTTPException(404, str(e)) except Exception as e: raise HTTPException(500, str(e)) @router.get("/spreadsheets/{spreadsheet_id}/sheets") async def list_sheets( spreadsheet_id: str, text: bool = False, ): """List all sheets in a spreadsheet.""" try: client = _get_sheets_client() sheets = client.get_all_sheets(spreadsheet_id) if text: lines = [f"Sheets in {spreadsheet_id}:", ""] for sheet in sheets: lines.append( f" [{sheet['index']}] {sheet['title']} " f"({sheet['row_count']} rows x {sheet['column_count']} cols)" ) return PlainTextResponse("\n".join(lines)) return {"spreadsheet_id": spreadsheet_id, "sheets": sheets} except GoogleSheetsError as e: raise HTTPException(404, str(e)) except Exception as e: raise HTTPException(500, str(e))