diff options
| author | tycfuvgibhoinjok <aneuhmanh@gmail.com> | 2026-03-31 13:46:18 +0300 |
|---|---|---|
| committer | tycfuvgibhoinjok <aneuhmanh@gmail.com> | 2026-03-31 13:46:18 +0300 |
| commit | 7e41576035cd9f0004255a7490e6691c6d989ff6 (patch) | |
| tree | 4d78369f54b32c9f72bcc137b41f1cc0172f30fc /app/auth | |
| parent | 051f562c6ceafe8f25e10548bd8f5543839794e5 (diff) | |
add jwt
Diffstat (limited to 'app/auth')
| -rw-r--r-- | app/auth/jwt.py | 71 | ||||
| -rw-r--r-- | app/auth/login.py | 0 | ||||
| -rw-r--r-- | app/auth/register.py | 32 |
3 files changed, 71 insertions, 32 deletions
diff --git a/app/auth/jwt.py b/app/auth/jwt.py index e69de29..2d2aac5 100644 --- a/app/auth/jwt.py +++ b/app/auth/jwt.py @@ -0,0 +1,71 @@ +import os +from datetime import datetime, timedelta +from typing import Dict, Optional + +import jwt +from dotenv import load_dotenv + +from app.utils.logger_cfg import logger + +load_dotenv() +JWT_SECRET = os.getenv("JWT_SECRET") +if not JWT_SECRET: + logger.critical("JWT_SECRET environment variable not set! Exiting.") + raise RuntimeError("JWT_SECRET environment variable not set!") + +JWT_ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 60 +REFRESH_TOKEN_EXPIRE_DAYS = 7 + + +def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str: + try: + to_encode = data.copy() + expire = datetime.utcnow() + ( + expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + ) + to_encode.update({"exp": expire}) + safe_payload = {k: v for k, v in to_encode.items() if k != "password"} + logger.debug(f"Creating access token with payload: {safe_payload}") + return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM) + except Exception as e: + logger.exception(f"Failed to create access token: {e}") + raise RuntimeError("Failed to create access token") + + +def create_refresh_token(data: Dict, expires_delta: Optional[timedelta] = None) -> str: + try: + to_encode = data.copy() + expire = datetime.utcnow() + ( + expires_delta or timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) + ) + to_encode.update({"exp": expire}) + logger.debug(f"Creating refresh token with payload: {to_encode}") + return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM) + except Exception as e: + logger.exception(f"Failed to create refresh token: {e}") + raise RuntimeError("Failed to create refresh token") + + +def decode_token(token: str) -> Dict: + try: + logger.debug("Decoding JWT token...") + payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) + safe_payload = {k: v for k, v in payload.items() if k != "password"} + logger.info(f"JWT decoded successfully: {safe_payload}") + return payload + except jwt.ExpiredSignatureError: + logger.warning("JWT token has expired") + raise ValueError("Token has expired") + except jwt.InvalidSignatureError: + logger.warning("JWT token signature invalid") + raise ValueError("Invalid token signature") + except jwt.DecodeError: + logger.warning("JWT token decode failed (possibly malformed)") + raise ValueError("Malformed token") + except jwt.InvalidTokenError: + logger.warning("JWT token invalid for unknown reason") + raise ValueError("Invalid token") + except Exception as e: + logger.exception(f"Unexpected error decoding JWT: {e}") + raise RuntimeError("Unexpected error while decoding token") diff --git a/app/auth/login.py b/app/auth/login.py deleted file mode 100644 index e69de29..0000000 --- a/app/auth/login.py +++ /dev/null diff --git a/app/auth/register.py b/app/auth/register.py deleted file mode 100644 index f1c3ec3..0000000 --- a/app/auth/register.py +++ /dev/null @@ -1,32 +0,0 @@ -from app.models.auth import User -from argon2 import PasswordHasher -from fastapi import APIRouter, Depends, HTTPException, status -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - -from app.schemas.auth import RegisterSchema -from app.utils.db import get_db - -router = APIRouter() -ph = PasswordHasher() - - -@router.post("/register") -async def register(user: RegisterSchema, db: AsyncSession = Depends(get_db)): - result = await db.execute( - select(User).where( - (User.username == user.username) | (User.email == user.email) - ) - ) - existing = result.scalar_one_or_none() - if existing: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Username or email already exists", - ) - - hashed = ph.hash(user.password) - new_user = User(username=user.username, email=user.email, password_hash=hashed) - db.add(new_user) - await db.commit() - return {"message": "User registered successfully"} |
