Add session cookie for browser-isolated OAuth sessions

This commit is contained in:
buenosairesam
2026-01-27 06:45:05 -03:00
parent 2babd47835
commit 5603979d5c

View File

@@ -2,14 +2,54 @@
API routes for Google vein.
"""
import json
import secrets
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from fastapi import APIRouter, Cookie, HTTPException, Query, Request, Response
from fastapi.responses import PlainTextResponse, RedirectResponse
# Import shared OAuth utilities from veins parent
from oauth import TokenStorage
SESSION_COOKIE = "spr_session"
def _load_auth_config():
"""Load auth config from room config.json."""
config_path = Path("/app/cfg/config.json")
if not config_path.exists():
return {}
try:
with open(config_path) as f:
config = json.load(f)
return config.get("auth", {})
except Exception:
return {}
def _is_user_allowed(email: str, domain: str) -> bool:
"""Check if user is allowed based on auth config."""
auth = _load_auth_config()
allowed_domains = auth.get("allowed_domains", [])
allowed_emails = auth.get("allowed_emails", [])
# No restrictions = allow all
if not allowed_domains and not allowed_emails:
return True
# Check email list
if email in allowed_emails:
return True
# Check domain list
if domain and domain in allowed_domains:
return True
return False
from ..core.oauth import GoogleOAuth
from ..core.sheets import GoogleSheetsClient, GoogleSheetsError
from ..models.formatter import format_sheet_values, format_spreadsheet_metadata
@@ -124,6 +164,7 @@ async def start_oauth(
@router.get("/oauth/callback")
async def oauth_callback(
request: Request,
code: Optional[str] = None,
state: Optional[str] = None,
error: Optional[str] = None,
@@ -136,29 +177,59 @@ async def oauth_callback(
raise HTTPException(400, "Missing authorization code")
# Extract redirect URL from state if present
# Format: "csrf_state|redirect_url" OR just "redirect_url" if no csrf state
# Format: "csrf_state|redirect_url" OR just "redirect_url" (path starting with /)
redirect_url = None
if state:
if "|" in state:
parts = state.split("|", 1)
redirect_url = parts[1] if len(parts) > 1 else None
elif state.startswith("http"):
# No csrf state, just the redirect URL
else:
# No separator, state is the redirect URL itself
redirect_url = state
try:
tokens = oauth_client.exchange_code_for_tokens(code)
token_storage.save_tokens(DEFAULT_USER_ID, tokens)
# First get user info to validate before storing tokens
user_info = oauth_client.exchange_code_for_user(code)
user_email = user_info.get("email", "")
user_domain = user_info.get("hd", "") # hosted domain (for Google Workspace)
# Redirect back to original page if specified
if redirect_url:
return RedirectResponse(url=redirect_url)
# Check if user is allowed
if not _is_user_allowed(user_email, user_domain):
raise HTTPException(
403,
f"Access restricted. Your account ({user_email}) is not authorized.",
)
return {
"status": "ok",
"message": "Successfully authenticated with Google",
"user": DEFAULT_USER_ID,
# Generate session ID for this browser
session_id = secrets.token_urlsafe(32)
# Store tokens with session ID
tokens = {
"access_token": user_info.get("access_token", ""),
"refresh_token": user_info.get("refresh_token", ""),
"token_type": "Bearer",
"email": user_email,
}
if tokens["access_token"]:
token_storage.save_tokens(session_id, tokens)
# Create response with session cookie
if redirect_url:
response = RedirectResponse(url=redirect_url)
else:
response = RedirectResponse(url="/")
response.set_cookie(
key=SESSION_COOKIE,
value=session_id,
httponly=True,
secure=True,
samesite="lax",
max_age=60 * 60 * 24 * 30, # 30 days
)
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Failed to exchange code: {e}")
@@ -181,36 +252,47 @@ async def get_userinfo(
@router.get("/oauth/status")
async def oauth_status():
"""Check if user is authenticated (for sidebar)."""
async def oauth_status(spr_session: Optional[str] = Cookie(None)):
"""Check if this browser's session is authenticated."""
try:
tokens = token_storage.load_tokens(DEFAULT_USER_ID)
if not spr_session:
return {"authenticated": False}
# Load tokens for this session
tokens = token_storage.load_tokens(spr_session)
if not tokens:
return {"authenticated": False}
# Check if expired
if token_storage.is_expired(tokens):
if "refresh_token" not in tokens:
return {"authenticated": False}
try:
new_tokens = oauth_client.refresh_access_token(tokens["refresh_token"])
token_storage.save_tokens(DEFAULT_USER_ID, new_tokens)
except Exception:
return {"authenticated": False}
# Validate token content
access_token = tokens.get("access_token")
email = tokens.get("email")
if not access_token or not email:
return {"authenticated": False}
return {
"authenticated": True,
"user": {"email": f"{DEFAULT_USER_ID}@authenticated"},
"user": {"email": email},
}
except Exception:
return {"authenticated": False}
@router.get("/oauth/logout")
async def logout():
"""Clear stored tokens."""
token_storage.delete_tokens(DEFAULT_USER_ID)
return {"status": "ok", "message": "Logged out"}
async def logout(redirect: str = "/", spr_session: Optional[str] = Cookie(None)):
"""Clear this browser's session and redirect."""
response = RedirectResponse(url=redirect)
# Delete token file for this session
if spr_session:
storage_path = Path("/app/artery/veins/google/storage")
token_file = storage_path / f"tokens_{spr_session}.json"
if token_file.exists():
token_file.unlink()
# Clear the session cookie
response.delete_cookie(key=SESSION_COOKIE)
return response
@router.get("/spreadsheets/{spreadsheet_id}")