307 lines
14 KiB
Python
Raw Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/19 01:00
# @UpdateTime : 2025/01/19 01:00
# @Author : sonder
# @File : login.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import timedelta, datetime
from fastapi import APIRouter, Request, Depends
from fastapi.encoders import jsonable_encoder
from starlette.responses import JSONResponse
from tortoise.expressions import Q
from annotation.log import Log
from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User, Role, UserRole, LoginLog, OperationLog, QueryCodeLog, QueryCode
2025-02-13 02:27:44 +08:00
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
from schemas.user import RegisterUserParams
from utils.captcha import Captcha
from utils.log import logger
from utils.mail import Email
from utils.password import Password
from utils.response import Response
loginAPI = APIRouter()
@loginAPI.post("/login", response_class=JSONResponse, summary="用户登录")
@Log(title="用户登录", business_type=BusinessType.GRANT, log_type="login")
async def login(
request: Request,
params: CustomOAuth2PasswordRequestForm = Depends()
):
request.app.state.session_id = None
request.app.state.login_status = False
2025-02-13 02:27:44 +08:00
user = LoginParams(
username=params.username,
password=params.password,
loginDays=params.loginDays,
code=params.code,
uuid=params.uuid
)
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
# 判断请求是否来自于api文档如果是返回指定格式的结果用于修复api文档认证成功后token显示undefined的bug
request_from_swagger = request.headers.get('referer').endswith('docs') if request.headers.get(
'referer') else False
request_from_redoc = request.headers.get('referer').endswith('redoc') if request.headers.get(
'referer') else False
# 验证码校验,如果开启验证码校验,则进行验证码校验,如果关闭则跳过验证码校验. 如果请求来自api文档则跳过验证码校验
if captcha_enabled and not request_from_redoc and not request_from_swagger:
result = await Captcha.verify_code(request, code=user.code, session_id=user.uuid)
if not result["status"]:
return Response.error(msg=result["msg"])
result = await LoginController.login(user)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
request.app.state.session_id = result["session_id"]
request.app.state.login_status = True
2025-02-13 02:27:44 +08:00
if request_from_swagger or request_from_redoc:
return {'access_token': result["accessToken"], 'token_type': 'Bearer',
"expires_in": result["expiresIn"] * 60}
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(data=result)
request.app.state.login_status = False
2025-02-13 02:27:44 +08:00
return Response.failure(msg="登录失败,账号或密码错误!")
@loginAPI.post("/register", response_class=JSONResponse, response_model=LoginResponse, summary="用户注册")
async def register(request: Request, params: RegisterUserParams):
register_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
if not register_enabled:
return Response.error(msg="注册功能已关闭!")
result = await Email.verify_code(request, username=params.username, mail=params.email, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="注册失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
# 默认分配注册用户
userRole = await Role.get_or_none(department__name="注册用户", code="user", del_flag=1).values(
department_id="department__id", id="id")
if not params.department_id:
params.department_id = userRole.get("department_id", "")
2025-02-13 02:27:44 +08:00
department = await Department.get_or_none(id=params.department_id)
userRole = await Role.get_or_none(department__id=department.id, code="user", del_flag=1).values(id="id")
print(userRole)
2025-02-13 02:27:44 +08:00
user = await User.create(
username=params.username,
password=params.password,
nickname=params.nickname,
phone=params.phone,
email=params.email,
gender=params.gender,
department=department,
status=params.status,
)
if user:
# 默认分配普通用户角色
await UserRole.create(
user_id=user.id,
role_id=userRole.get("id", ""),
)
2025-02-13 02:27:44 +08:00
userParams = LoginParams(
username=params.username,
password=params.password
)
result = await LoginController.login(userParams)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(msg="注册成功!", data=result)
return Response.success(msg="注册成功!")
2025-02-13 02:27:44 +08:00
else:
return Response.error(msg="注册失败!")
@loginAPI.get("/captcha", response_class=JSONResponse, response_model=GetCaptchaResponse, summary="获取验证码")
async def get_captcha(request: Request):
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_enabled')
== 'true'
else False
)
register_enabled = (
True
2025-02-13 18:04:42 +08:00
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
2025-02-13 02:27:44 +08:00
== 'true'
else False
)
if captcha_enabled:
captcha_type = (
await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
else "1"
)
captcha_result = await Captcha.create_captcha(captcha_type)
session_id = str(uuid.uuid4())
captcha = captcha_result[0]
result = captcha_result[-1]
await request.app.state.redis.set(
f'{RedisKeyConfig.CAPTCHA_CODES.key}:{session_id}', result, ex=timedelta(minutes=2)
)
logger.info(f'编号为{session_id}的会话获取图片验证码成功')
return Response.success(data={
"uuid": session_id,
"captcha": captcha,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
else:
return Response.success(data={
"uuid": None,
"captcha": None,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
@loginAPI.post("/code", response_class=JSONResponse, response_model=BaseResponse, summary="获取邮件验证码")
async def get_code(request: Request, params: GetEmailCodeParams):
result = await Email.send_email(request, username=params.username, title=params.title, mail=params.mail)
if result:
return Response.success(msg="验证码发送成功!")
return Response.error(msg="验证码发送失败!")
@loginAPI.put("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
@loginAPI.post("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
async def reset_password(request: Request, params: ResetPasswordParams):
result = await Email.verify_code(request, username=params.username, mail=params.mail, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
user = await User.get_or_none(Q(username=params.username) | Q(phone=params.username), email=params.mail)
if user:
user.password = await Password.get_password_hash(input_password=params.password)
await user.save()
return Response.success(msg="密码重置成功!")
return Response.error(msg="密码重置失败,用户不存在!")
@loginAPI.get("/info", response_class=JSONResponse, response_model=GetUserInfoResponse, summary="获取用户信息")
@Log(title="获取用户信息", business_type=BusinessType.SELECT)
async def info(
request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
return Response.success(data=current_user)
@loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息")
@Log(title="获取路由信息", business_type=BusinessType.SELECT)
2025-02-13 02:27:44 +08:00
async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
2025-02-13 02:27:44 +08:00
routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}')
if routes:
return Response.success(data=eval(routes))
routes = await LoginController.get_user_routes(current_user["id"], sub_departments=sub_departments)
2025-02-13 02:27:44 +08:00
userRoutes = str(jsonable_encoder(routes))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}',
userRoutes,
ex=timedelta(minutes=5),
)
return Response.success(data=routes)
@loginAPI.post("/refreshToken", response_class=JSONResponse, response_model=LoginResponse, summary="刷新token")
@Log(title="刷新token", business_type=BusinessType.GRANT)
async def refresh_token(request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
session_id = uuid.uuid4().__str__()
accessToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=2 * 24 * 60))
expiresTime = (datetime.now() + timedelta(minutes=2 * 24 * 60)).timestamp()
refreshToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=(4 * 24 + 2) * 60))
return Response.success(data={"accessToken": accessToken, "refreshToken": refreshToken, "expiresTime": expiresTime})
@loginAPI.post("/logout", response_class=JSONResponse, response_model=BaseResponse, summary="用户登出")
@Log(title="退出登录", business_type=BusinessType.FORCE)
async def logout(request: Request, status: bool = Depends(LoginController.logout)):
if status:
return Response.success(data="退出成功!")
return Response.error(data="登出失败!")
@loginAPI.post("/unsubscribe", response_class=JSONResponse, response_model=BaseResponse, summary="用户注销")
@Log(title="用户注销", business_type=BusinessType.FORCE)
async def unsubscribe(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
id = current_user.get("id")
session_id = current_user.get("session_id")
if user := await User.get_or_none(id=id, del_flag=1):
user.del_flag = 0
await user.save()
redis_token = await request.app.state.redis.get(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
if redis_token:
await request.app.state.redis.delete(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除编码查询日志
logs = await QueryCodeLog.filter(operator__id=user.id, del_flag=1).values("id")
for log in logs:
await QueryCode.filter(session_id=log["id"], del_flag=1).update(del_flag=0)
await QueryCodeLog.filter(id=log["id"], del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(data="注销成功!")
else:
return Response.error(data="注销失败!")