import os from datetime import datetime, timedelta from typing import Dict, Optional import jwt from dotenv import load_dotenv from app.utils.logger_cfg import logger load_dotenv() JWT_SECRET = os.getenv("JWT_SECRET") if not JWT_SECRET: logger.critical("JWT_SECRET environment variable not set! Exiting.") raise RuntimeError("JWT_SECRET environment variable not set!") JWT_ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 REFRESH_TOKEN_EXPIRE_DAYS = 30 def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str: try: to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) to_encode.update({"exp": expire}) safe_payload = {k: v for k, v in to_encode.items() if k != "password"} logger.debug(f"Creating access token with payload: {safe_payload}") return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM) except Exception as e: logger.exception(f"Failed to create access token: {e}") raise RuntimeError("Failed to create access token") def create_refresh_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str: try: to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta or timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) ) to_encode.update({"exp": expire}) logger.debug(f"Creating refresh token with payload: {to_encode}") return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM) except Exception as e: logger.exception(f"Failed to create refresh token: {e}") raise RuntimeError("Failed to create refresh token") def decode_token(token: str) -> Dict: try: logger.debug("Decoding JWT token...") payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) safe_payload = {k: v for k, v in payload.items() if k != "password"} logger.info(f"JWT decoded successfully: {safe_payload}") return payload except jwt.ExpiredSignatureError: logger.warning("JWT token has expired") raise ValueError("Token has expired") except jwt.InvalidSignatureError: logger.warning("JWT token signature invalid") raise ValueError("Invalid token signature") except jwt.DecodeError: logger.warning("JWT token decode failed (possibly malformed)") raise ValueError("Malformed token") except jwt.InvalidTokenError: logger.warning("JWT token invalid for unknown reason") raise ValueError("Invalid token") except Exception as e: logger.exception(f"Unexpected error decoding JWT: {e}") raise RuntimeError("Unexpected error while decoding token")