import os import random import string from pathlib import Path from typing import Any from authlib.integrations.starlette_client import OAuth from dotenv import load_dotenv from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import RedirectResponse from sqlalchemy import insert, select from sqlalchemy.ext.asyncio import AsyncSession from app.auth.jwt import create_access_token, create_refresh_token from app.models.integrations import UserIntegration from app.models.profile import Profile from app.models.user import User from app.utils.db import get_async_session from app.utils.logger_cfg import logger env_path = Path(__file__).resolve().parents[3] / ".env" load_dotenv(env_path) GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:3000") if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: raise RuntimeError("Google OAuth env not configured") router = APIRouter(prefix="/auth/google", tags=["auth"]) oauth = OAuth() oauth.register( name="google", client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"}, ) @router.get("/login") async def google_login(request: Request): redirect_uri = request.url_for("google_callback") logger.debug("Google redirect_uri: {}", redirect_uri) return await oauth.google.authorize_redirect(request, redirect_uri) def generate_username(length: int = 8) -> str: chars = string.ascii_letters + string.digits return "".join(random.choices(chars, k=length)) async def is_username_taken(session: AsyncSession, username: str) -> bool: result = await session.execute( select(User.id).where(User.username == username) ) return result.scalar_one_or_none() is not None async def generate_unique_username( session: AsyncSession, max_attempts: int = 10, ) -> str: for _ in range(max_attempts): username = generate_username() if not await is_username_taken(session, username): return username raise RuntimeError("Failed to generate unique username") async def ensure_user_state(session: AsyncSession, user_id: int): profile = await session.execute( select(Profile).where(Profile.user_id == user_id) ) profile = profile.scalar_one_or_none() if not profile: session.add(Profile(user_id=user_id)) logger.debug("Created missing Profile for user={}", user_id) integrations = await session.execute( select(UserIntegration).where(UserIntegration.user_id == user_id) ) integrations = integrations.scalar_one_or_none() if not integrations: session.add(UserIntegration(user_id=user_id)) logger.debug("Created missing Integrations for user={}", user_id) await session.commit() async def create_user_with_relations( session: AsyncSession, *, email: str, username: str, google_id: str, ) -> User: result = await session.execute( insert(User) .values( email=email, username=username, google_id=google_id, password=None, ) .returning(User.id) ) user_id = result.scalar_one() session.add(Profile(user_id=user_id)) session.add(UserIntegration(user_id=user_id)) await session.commit() result = await session.execute(select(User).where(User.id == user_id)) return result.scalar_one() @router.get("/callback", name="google_callback") async def google_callback( request: Request, session: AsyncSession = Depends(get_async_session), ): try: token: dict[str, Any] = await oauth.google.authorize_access_token( request ) logger.debug("OAuth token received: {}", token.keys()) except Exception as e: logger.exception("Google OAuth failed: {}", str(e)) raise HTTPException(status_code=400, detail="OAuth failed") try: user_info = token.get("userinfo") or await oauth.google.parse_id_token( request, token ) except Exception as e: logger.exception("Failed to parse user info: {}", str(e)) raise HTTPException(status_code=400, detail="Invalid token") email: str | None = user_info.get("email") google_id: str | None = user_info.get("sub") if not email or not google_id: raise HTTPException(status_code=400, detail="Invalid Google response") result = await session.execute( select(User).where(User.google_id == google_id) ) user = result.scalar_one_or_none() if not user: logger.info("Creating new Google user: {}", email) username = await generate_unique_username(session) user = await create_user_with_relations( session, email=email, username=username, google_id=google_id, ) else: logger.debug("User exists: {}", user.id) await ensure_user_state(session, user.id) access_token = create_access_token( {"sub": str(user.id), "token_version": user.token_version} ) refresh_token = create_refresh_token( {"sub": str(user.id), "token_version": user.token_version} ) response = RedirectResponse(url=f"{FRONTEND_URL}/oauth-success") response.set_cookie( key="access_token", value=access_token, httponly=True, secure=False, samesite="lax", max_age=60 * 15, ) response.set_cookie( key="refresh_token", value=refresh_token, httponly=True, secure=False, samesite="lax", max_age=60 * 60 * 24 * 7, ) return response