1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
import re
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.auth.jwt import create_access_token, create_refresh_token
from app.models.profile import Profile
from app.models.user import User
from app.schemas.user import UserCreate, UserRead
from app.utils.db import get_async_session
from app.utils.hash_cfg import hash_password
from app.utils.logger_cfg import logger
router = APIRouter(tags=["auth"])
@router.post("/register", response_model=UserRead)
async def register_user(
user: UserCreate,
response: Response,
session: AsyncSession = Depends(get_async_session),
):
logger.debug("Register request received")
email: Optional[str] = user.email.strip() if user.email else None
logger.debug("Normalized email value: {}", email)
logger.info("Registration attempt | username={} email={}", user.username, email)
if not (
re.search(r"[A-Za-z]", user.password)
and re.search(r"\d", user.password)
and re.search(r"[^\w\s]", user.password)
):
logger.warning(
"Registration failed | password complexity requirement not met | username={}",
user.username,
)
raise HTTPException(
status_code=400,
detail={
"field": "password",
"message": "Попробуйте сочетание букв, цифр и символов.",
},
)
result = await session.execute(select(User).where(User.username == user.username))
if result.scalars().first():
logger.warning(
"Registration failed | username already exists | username={}", user.username
)
raise HTTPException(
status_code=400,
detail={"field": "username", "message": "Никнейм уже занят."},
)
if email:
result = await session.execute(select(User).where(User.email == email))
if result.scalars().first():
logger.warning(
"Registration failed | email already exists | email={}", email
)
raise HTTPException(
status_code=400,
detail={"field": "email", "message": "Адрес уже занят."},
)
hashed_password = hash_password(user.password)
new_user = User(username=user.username, email=email, password=hashed_password)
session.add(new_user)
await session.flush()
new_profile = Profile(user_id=new_user.id)
session.add(new_profile)
from app.models.integrations import UserIntegration
new_integrations = UserIntegration(user_id=new_user.id)
session.add(new_integrations)
await session.commit()
result = await session.execute(
select(User)
.options(
selectinload(User.profile),
selectinload(User.integrations),
)
.where(User.id == new_user.id)
)
new_user = result.scalars().first()
result = await session.execute(
select(User).options(selectinload(User.profile)).where(User.id == new_user.id)
)
new_user = result.scalars().first()
logger.success(
"User successfully registered | id={} username={} email={}",
new_user.id,
new_user.username,
new_user.email,
)
access_token = create_access_token(
{"sub": str(new_user.id), "token_version": new_user.token_version}
)
refresh_token = create_refresh_token(
{"sub": str(new_user.id), "token_version": new_user.token_version}
)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=False,
samesite="lax",
max_age=60 * 60,
)
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=False,
samesite="lax",
max_age=30 * 24 * 60 * 60,
)
return new_user
|