257 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding : UTF-8 _*_
# @Time : 2025/01/19 01:00
# @UpdateTime : 2025/01/19 01:00
# @Author : sonder
# @File : login.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import timedelta, datetime
from fastapi import APIRouter, Request, Depends
from fastapi.encoders import jsonable_encoder
from starlette.responses import JSONResponse
from tortoise.expressions import Q
from annotation.log import Log
from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
from schemas.user import RegisterUserParams
from utils.captcha import Captcha
from utils.log import logger
from utils.mail import Email
from utils.password import Password
from utils.response import Response
loginAPI = APIRouter()
@loginAPI.post("/login", response_class=JSONResponse, summary="用户登录")
@Log(title="用户登录", business_type=BusinessType.GRANT, log_type="login")
async def login(
request: Request,
params: CustomOAuth2PasswordRequestForm = Depends()
):
user = LoginParams(
username=params.username,
password=params.password,
loginDays=params.loginDays,
code=params.code,
uuid=params.uuid
)
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account.captcha_enabled')
== 'true'
else False
)
# 判断请求是否来自于api文档如果是返回指定格式的结果用于修复api文档认证成功后token显示undefined的bug
request_from_swagger = request.headers.get('referer').endswith('docs') if request.headers.get(
'referer') else False
request_from_redoc = request.headers.get('referer').endswith('redoc') if request.headers.get(
'referer') else False
# 验证码校验,如果开启验证码校验,则进行验证码校验,如果关闭则跳过验证码校验. 如果请求来自api文档则跳过验证码校验
if captcha_enabled and not request_from_redoc and not request_from_swagger:
result = await Captcha.verify_code(request, code=user.code, session_id=user.uuid)
if not result["status"]:
return Response.error(msg=result["msg"])
result = await LoginController.login(user)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
request.app.state.session_id = result["session_id"]
if request_from_swagger or request_from_redoc:
return {'access_token': result["accessToken"], 'token_type': 'Bearer',
"expires_in": result["expiresIn"] * 60}
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(data=result)
return Response.failure(msg="登录失败,账号或密码错误!")
@loginAPI.post("/register", response_class=JSONResponse, response_model=LoginResponse, summary="用户注册")
async def register(request: Request, params: RegisterUserParams):
register_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:register_enabled')
== 'true'
else False
)
if not register_enabled:
return Response.error(msg="注册功能已关闭!")
result = await Email.verify_code(request, username=params.username, mail=params.email, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="注册失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
department = await Department.get_or_none(id=params.department_id)
user = await User.create(
username=params.username,
password=params.password,
nickname=params.nickname,
phone=params.phone,
email=params.email,
gender=params.gender,
department=department,
status=params.status,
)
if user:
userParams = LoginParams(
username=params.username,
password=params.password
)
result = await LoginController.login(userParams)
if result["status"]:
await request.app.state.redis.set(
f'{RedisKeyConfig.ACCESS_TOKEN.key}:{result["session_id"]}',
result["accessToken"],
ex=timedelta(minutes=result["expiresIn"]),
)
userInfo = str(jsonable_encoder(result["userInfo"]))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_INFO.key}:{result["userInfo"]["id"]}',
userInfo,
ex=timedelta(minutes=5),
)
result.pop("status")
result.pop("expiresIn")
result.pop("session_id")
result.pop("userInfo")
return Response.success(msg="注册成功!", data=result)
return Response.error(msg="注册成功!")
else:
return Response.error(msg="注册失败!")
@loginAPI.get("/captcha", response_class=JSONResponse, response_model=GetCaptchaResponse, summary="获取验证码")
async def get_captcha(request: Request):
captcha_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_enabled')
== 'true'
else False
)
register_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:register_enabled')
== 'true'
else False
)
if captcha_enabled:
captcha_type = (
await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_captcha_type')
else "1"
)
captcha_result = await Captcha.create_captcha(captcha_type)
session_id = str(uuid.uuid4())
captcha = captcha_result[0]
result = captcha_result[-1]
await request.app.state.redis.set(
f'{RedisKeyConfig.CAPTCHA_CODES.key}:{session_id}', result, ex=timedelta(minutes=2)
)
logger.info(f'编号为{session_id}的会话获取图片验证码成功')
return Response.success(data={
"uuid": session_id,
"captcha": captcha,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
else:
return Response.success(data={
"uuid": None,
"captcha": None,
"captcha_enabled": captcha_enabled,
"register_enabled": register_enabled,
})
@loginAPI.post("/code", response_class=JSONResponse, response_model=BaseResponse, summary="获取邮件验证码")
async def get_code(request: Request, params: GetEmailCodeParams):
result = await Email.send_email(request, username=params.username, title=params.title, mail=params.mail)
if result:
return Response.success(msg="验证码发送成功!")
return Response.error(msg="验证码发送失败!")
@loginAPI.put("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
@loginAPI.post("/resetPassword", response_class=JSONResponse, response_model=BaseResponse, summary="重置密码")
async def reset_password(request: Request, params: ResetPasswordParams):
result = await Email.verify_code(request, username=params.username, mail=params.mail, code=params.code)
if not result["status"]:
return Response.error(msg=result["msg"])
user = await User.get_or_none(Q(username=params.username) | Q(phone=params.username), email=params.mail)
if user:
user.password = await Password.get_password_hash(input_password=params.password)
await user.save()
return Response.success(msg="密码重置成功!")
return Response.error(msg="密码重置失败,用户不存在!")
@loginAPI.get("/info", response_class=JSONResponse, response_model=GetUserInfoResponse, summary="获取用户信息")
@Log(title="获取用户信息", business_type=BusinessType.SELECT)
async def info(
request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
return Response.success(data=current_user)
@loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息")
# @Log(title="获取路由信息", business_type=BusinessType.SELECT)
async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}')
if routes:
return Response.success(data=eval(routes))
routes = await LoginController.get_user_routes(current_user["id"])
userRoutes = str(jsonable_encoder(routes))
await request.app.state.redis.set(
f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}',
userRoutes,
ex=timedelta(minutes=5),
)
return Response.success(data=routes)
@loginAPI.post("/refreshToken", response_class=JSONResponse, response_model=LoginResponse, summary="刷新token")
@Log(title="刷新token", business_type=BusinessType.GRANT)
async def refresh_token(request: Request,
current_user: dict = Depends(LoginController.get_current_user)
):
session_id = uuid.uuid4().__str__()
accessToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=2 * 24 * 60))
expiresTime = (datetime.now() + timedelta(minutes=2 * 24 * 60)).timestamp()
refreshToken = await LoginController.create_token(
data={"user": current_user, "id": current_user.get("id"), "session_id": session_id},
expires_delta=timedelta(minutes=(4 * 24 + 2) * 60))
return Response.success(data={"accessToken": accessToken, "refreshToken": refreshToken, "expiresTime": expiresTime})
@loginAPI.post("/logout", response_class=JSONResponse, response_model=BaseResponse, summary="用户登出")
@Log(title="退出登录", business_type=BusinessType.FORCE)
async def logout(request: Request, status: bool = Depends(LoginController.logout)):
if status:
return Response.success(data="退出成功!")
return Response.error(data="登出失败!")