274 lines
12 KiB
Python
Raw Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/19 01:00
# @UpdateTime : 2025/01/19 01:00
# @Author : sonder
# @File : login.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import timedelta, datetime
from fastapi import APIRouter, Request, Depends
from fastapi.encoders import jsonable_encoder
from starlette.responses import JSONResponse
from tortoise.expressions import Q
from annotation.log import Log
from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User, Role, UserRole
2025-02-13 02:27:44 +08:00
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
from schemas.user import RegisterUserParams
from utils.captcha import Captcha
from utils.log import logger
from utils.mail import Email
from utils.password import Password
from utils.response import Response
loginAPI = APIRouter()
@loginAPI.post("/login", response_class=JSONResponse, summary="用户登录")
@Log(title="用户登录", business_type=BusinessType.GRANT, log_type="login")
async def login(
request: Request,
params: CustomOAuth2PasswordRequestForm = Depends()
):
request.app.state.session_id = None
request.app.state.login_status = False
2025-02-13 02:27:44 +08:00
user = LoginParams(
username=params.username,
password=params.password,
loginDays=params.loginDays,
code=params.code,
uuid=params.uuid
)
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
# 判断请求是否来自于api文档如果是返回指定格式的结果用于修复api文档认证成功后token显示undefined的bug
request_from_swagger = request.headers.get('referer').endswith('docs') if request.headers.get(
'referer') else False
request_from_redoc = request.headers.get('referer').endswith('redoc') if request.headers.get(
'referer') else False
# 验证码校验,如果开启验证码校验,则进行验证码校验,如果关闭则跳过验证码校验. 如果请求来自api文档则跳过验证码校验
if captcha_enabled and not request_from_redoc and not request_from_swagger:
result = await Captcha.verify_code(request, code=user.code, session_id=user.uuid)
if not result["status"]:
return Response.error(msg=result["msg"])
result = await LoginController.login(user)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
request.app.state.session_id = result["session_id"]
request.app.state.login_status = True
2025-02-13 02:27:44 +08:00
if request_from_swagger or request_from_redoc:
return {'access_token': result["accessToken"], 'token_type': 'Bearer',
"expires_in": result["expiresIn"] * 60}
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(data=result)
request.app.state.login_status = False
2025-02-13 02:27:44 +08:00
return Response.failure(msg="登录失败,账号或密码错误!")
@loginAPI.post("/register", response_class=JSONResponse, response_model=LoginResponse, summary="用户注册")
async def register(request: Request, params: RegisterUserParams):
register_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
if not register_enabled:
return Response.error(msg="注册功能已关闭!")
result = await Email.verify_code(request, username=params.username, mail=params.email, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="注册失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
# 默认分配注册用户
userRole = await Role.get_or_none(department__name="注册用户", code="user", del_flag=1).values(
department_id="department__id", id="id")
if not params.department_id:
params.department_id = userRole.get("department_id", "")
2025-02-13 02:27:44 +08:00
department = await Department.get_or_none(id=params.department_id)
userRole = await Role.get_or_none(department__id=department.id, code="user", del_flag=1).values(id="id")
print(userRole)
2025-02-13 02:27:44 +08:00
user = await User.create(
username=params.username,
password=params.password,
nickname=params.nickname,
phone=params.phone,
email=params.email,
gender=params.gender,
department=department,
status=params.status,
)
if user:
# 默认分配普通用户角色
await UserRole.create(
user_id=user.id,
role_id=userRole.get("id", ""),
)
2025-02-13 02:27:44 +08:00
userParams = LoginParams(
username=params.username,
password=params.password
)
result = await LoginController.login(userParams)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(msg="注册成功!", data=result)
return Response.success(msg="注册成功!")
2025-02-13 02:27:44 +08:00
else:
return Response.error(msg="注册失败!")
@loginAPI.get("/captcha", response_class=JSONResponse, response_model=GetCaptchaResponse, summary="获取验证码")
async def get_captcha(request: Request):
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_enabled')
== 'true'
else False
)
register_enabled = (
True
2025-02-13 18:04:42 +08:00
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
if captcha_enabled:
captcha_type = (
await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
else "1"
)
captcha_result = await Captcha.create_captcha(captcha_type)
session_id = str(uuid.uuid4())
captcha = captcha_result[0]
result = captcha_result[-1]
await request.app.state.redis.set(
f'{RedisKeyConfig.CAPTCHA_CODES.key}:{session_id}', result, ex=timedelta(minutes=2)
)
logger.info(f'编号为{session_id}的会话获取图片验证码成功')
return Response.success(data={
"uuid": session_id,
"captcha": captcha,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
else:
return Response.success(data={
"uuid": None,
"captcha": None,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
@loginAPI.post("/code", response_class=JSONResponse, response_model=BaseResponse, summary="获取邮件验证码")
async def get_code(request: Request, params: GetEmailCodeParams):
result = await Email.send_email(request, username=params.username, title=params.title, mail=params.mail)
if result:
return Response.success(msg="验证码发送成功!")
return Response.error(msg="验证码发送失败!")
@loginAPI.put("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
@loginAPI.post("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
async def reset_password(request: Request, params: ResetPasswordParams):
result = await Email.verify_code(request, username=params.username, mail=params.mail, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
user = await User.get_or_none(Q(username=params.username) | Q(phone=params.username), email=params.mail)
if user:
user.password = await Password.get_password_hash(input_password=params.password)
await user.save()
return Response.success(msg="密码重置成功!")
return Response.error(msg="密码重置失败,用户不存在!")
@loginAPI.get("/info", response_class=JSONResponse, response_model=GetUserInfoResponse, summary="获取用户信息")
@Log(title="获取用户信息", business_type=BusinessType.SELECT)
async def info(
request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
return Response.success(data=current_user)
@loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息")
@Log(title="获取路由信息", business_type=BusinessType.SELECT)
2025-02-13 02:27:44 +08:00
async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
2025-02-13 02:27:44 +08:00
routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}')
if routes:
return Response.success(data=eval(routes))
routes = await LoginController.get_user_routes(current_user["id"], sub_departments=sub_departments)
2025-02-13 02:27:44 +08:00
userRoutes = str(jsonable_encoder(routes))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}',
userRoutes,
ex=timedelta(minutes=5),
)
return Response.success(data=routes)
@loginAPI.post("/refreshToken", response_class=JSONResponse, response_model=LoginResponse, summary="刷新token")
@Log(title="刷新token", business_type=BusinessType.GRANT)
async def refresh_token(request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
session_id = uuid.uuid4().__str__()
accessToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=2 * 24 * 60))
expiresTime = (datetime.now() + timedelta(minutes=2 * 24 * 60)).timestamp()
refreshToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=(4 * 24 + 2) * 60))
return Response.success(data={"accessToken": accessToken, "refreshToken": refreshToken, "expiresTime": expiresTime})
@loginAPI.post("/logout", response_class=JSONResponse, response_model=BaseResponse, summary="用户登出")
@Log(title="退出登录", business_type=BusinessType.FORCE)
async def logout(request: Request, status: bool = Depends(LoginController.logout)):
if status:
return Response.success(data="退出成功!")
return Response.error(data="登出失败!")