350 lines
15 KiB
Python
Raw Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/20 00:31
# @UpdateTime : 2025/01/20 00:31
# @Author : sonder
# @File : user.py
# @Software : PyCharm
# @Comment : 本程序
import os
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from config.env import UploadConfig
from controller.login import LoginController
from controller.query import QueryController
from exceptions.exception import ModelValidatorException
from models import File as FileModel
from models import Role, Department
from models.user import User, UserRole
from schemas.common import BaseResponse
from schemas.department import GetDepartmentListResponse
from schemas.file import UploadFileResponse
from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \
AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, ResetPasswordParams
from utils.common import filterKeyValues
from utils.password import Password
from utils.response import Response
userAPI = APIRouter(prefix="/user", dependencies=[Depends(LoginController.get_current_user)])
@userAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增用户")
@Log(title="新增用户", business_type=BusinessType.INSERT)
async def add_user(
request: Request,
params: AddUserParams
):
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="添加失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
department = await Department.get_or_none(id=params.department_id)
user = await User.create(
username=params.username,
password=params.password,
nickname=params.nickname,
phone=params.phone,
email=params.email,
gender=params.gender,
department=department,
status=params.status,
)
if user:
return Response.success(msg="添加成功!")
else:
return Response.error(msg="添加失败!")
@userAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除用户")
@userAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除用户")
@Log(title="删除用户", business_type=BusinessType.DELETE)
async def delete_user(
request: Request,
id: str = Path(..., description="用户ID")):
if user := await User.get_or_none(id=id):
await user.delete()
return Response.success(msg="删除成功!")
else:
return Response.error(msg="删除失败,用户不存在!")
@userAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新用户")
@userAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新用户")
@Log(title="更新用户", business_type=BusinessType.UPDATE)
async def update_user(
request: Request,
params: UpdateUserParams,
id: str = Path(..., description="用户ID")):
if user := await User.get_or_none(id=id):
user.username = params.username
user.nickname = params.nickname
user.phone = params.phone
user.email = params.email
user.gender = params.gender
user.status = params.status
if department := await Department.get_or_none(id=params.department_id):
user.department = department
else:
user.department = None
await user.save()
return Response.success(msg="更新成功!")
else:
return Response.error(msg="更新失败,用户不存在!")
@userAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetUserInfoResponse, summary="获取用户信息")
@Log(title="获取用户信息", business_type=BusinessType.SELECT)
async def get_user_info(request: Request, id: str = Path(..., description="用户ID")):
if user := await User.get_or_none(id=id):
user = await user.first().values(
id="id",
create_time="create_time",
update_time="update_time",
username="username",
email="email",
phone="phone",
nickname="nickname",
gender="gender",
status="status",
department_id="department__id",
)
return Response.success(data=user)
else:
return Response.error(msg="用户不存在!")
@userAPI.get("/list", response_class=JSONResponse, response_model=GetUserListResponse, summary="获取用户列表")
@Log(title="获取用户列表", business_type=BusinessType.SELECT)
@Auth(["user:btn:queryUser"])
async def get_user_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
username: Optional[str] = Query(default=None, description="用户名"),
nickname: Optional[str] = Query(default=None, description="昵称"),
phone: Optional[str] = Query(default=None, description="手机号"),
email: Optional[str] = Query(default=None, description="邮箱"),
gender: Optional[str] = Query(default=None, description="性别"),
status: Optional[str] = Query(default=None, description="状态"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
'username': username,
'nickname': nickname,
'phone': phone,
'email': email,
'gender': gender,
'status': status,
'department_id': department_id
}.items() if v
}
total = await User.filter(**filterArgs).count()
result = await User.filter(**filterArgs).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
username="username",
email="email",
phone="phone",
nickname="nickname",
gender="gender",
status="status",
department_id="department__id",
)
return Response.success(data={
"result": result,
"total": total,
"page": page
})
@userAPI.post("/addRole", response_model=BaseResponse, response_class=JSONResponse, summary="添加用户角色")
@Log(title="添加用户角色", business_type=BusinessType.INSERT)
async def add_user_role(request: Request, params: AddUserRoleParams):
if await UserRole.get_or_none(user_id=params.user_id, role_id=params.role_id, del_flag=1):
return Response.error(msg="该用户已存在该角色!")
if user := await User.get_or_none(id=params.user_id, del_flag=1):
if role := await Role.get_or_none(id=params.role_id, del_flag=1):
userRole = await UserRole.create(user_id=user.id, role_id=role.id)
if userRole:
return Response.success(msg="添加成功!")
else:
return Response.error(msg="添加失败!")
else:
return Response.error(msg="添加失败,角色不存在!")
else:
return Response.error(msg="添加失败,用户不存在!")
@userAPI.delete("/deleteRole/{id}", response_model=BaseResponse, response_class=JSONResponse,
summary="删除用户角色")
@userAPI.post("/deleteRole/{id}", response_model=BaseResponse, response_class=JSONResponse,
summary="删除用户角色")
@Log(title="删除用户角色", business_type=BusinessType.DELETE)
async def delete_user_role(request: Request, id: str = Path(description="用户角色ID")):
if userRole := await UserRole.get_or_none(id=id, del_flag=1):
await userRole.delete()
return Response.success(msg="删除成功!")
else:
return Response.error(msg="删除失败,用户角色不存在!")
@userAPI.put("/updateRole", response_model=BaseResponse, response_class=JSONResponse, summary="修改用户角色")
@userAPI.post("/updateRole", response_model=BaseResponse, response_class=JSONResponse,
summary="修改用户角色")
@Log(title="修改用户角色", business_type=BusinessType.UPDATE)
async def update_user_role(request: Request, params: UpdateUserRoleParams):
# 获取用户已有角色
userRoles = await UserRole.filter(user_id=params.user_id, del_flag=1).values("role_id")
userRoles = await filterKeyValues(userRoles, "role_id")
# 利用集合找到需要添加的角色
addRoles = set(params.role_ids).difference(set(userRoles))
# 利用集合找到需要删除的角色
deleteRoles = set(userRoles).difference(set(params.role_ids))
# 添加角色
for role_id in addRoles:
if role := await Role.get_or_none(id=role_id, del_flag=1):
await UserRole.create(user_id=params.user_id, role_id=role.id)
# 删除角色
for role_id in deleteRoles:
if userRole := await UserRole.get_or_none(user_id=params.user_id, role_id=role_id, del_flag=1):
await userRole.delete()
return Response.success(msg="修改成功!")
@userAPI.get("/roleInfo/{id}", response_model=GetUserRoleInfoResponse, response_class=JSONResponse,
summary="获取用户角色信息")
@Log(title="获取用户角色信息", business_type=BusinessType.SELECT)
async def get_user_role_info(request: Request, id: str = Path(description="用户角色ID")):
if userRole := await UserRole.get_or_none(id=id, del_flag=1):
data = await userRole.first().values(
id="id",
user_id="user__id",
user_name="user__username",
role_name="role__name",
role_code="role__code",
role_id="role__id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data=data)
else:
return Response.error(msg="获取失败,用户角色不存在!")
@userAPI.get("/roleList/{id}", response_model=GetDepartmentListResponse, response_class=JSONResponse,
summary="获取用户角色列表")
@Log(title="获取用户角色列表", business_type=BusinessType.SELECT)
async def get_user_role_list(
request: Request,
id: str = Path(description="用户ID"),
):
result = await UserRole.filter(user_id=id).values(
id="id",
department_id="user__department__id",
department_name="user__department__name",
department_phone="user__department__phone",
department_principal="user__department__principal",
department_email="user__department__email",
role_name="role__name",
role_code="role__code",
role_id="role__id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data={
"result": result,
"total": len(result),
"page": 1
})
@userAPI.get("/permissionList/{id}", response_class=JSONResponse, response_model=GetUserPermissionListResponse,
summary="获取用户权限列表")
@Log(title="获取用户权限列表", business_type=BusinessType.SELECT)
async def get_user_permission_list(request: Request, id: str = Path(description="用户ID")):
permissions = await QueryController.get_user_permissions(user_id=id)
permissions = await filterKeyValues(permissions, "id")
# 获取用户角色
return Response.success(data=list(set(permissions)))
@userAPI.post("/avatar/{id}", response_model=UploadFileResponse, response_class=JSONResponse, summary="上传用户头像")
@Log(title="上传用户头像", business_type=BusinessType.UPDATE)
async def upload_user_avatar(
request: Request,
id: str = Path(description="用户ID"),
file: UploadFile = File(...)):
if user := await User.get_or_none(id=id):
image_mimetypes = [
'image/jpeg',
'image/png',
'image/gif',
'image/svg+xml',
'image/bmp',
'image/webp',
'image/tiff'
]
if file.content_type not in image_mimetypes:
raise ModelValidatorException(message="文件类型不支持")
# 2. 生成唯一的文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_filename = f"{id}_{timestamp}"
# 3. 保存文件到服务器
file_path = os.path.join(UploadConfig.UPLOAD_PATH, unique_filename)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# 4. 构建文件的相对路径和绝对路径
relative_path = os.path.join(UploadConfig.UPLOAD_PREFIX, unique_filename) # 相对路径
absolute_path = os.path.abspath(file_path) # 绝对路径
# 5. 将文件信息保存到数据库
file_record = await FileModel.create(
name=file.filename,
size=os.path.getsize(file_path),
file_type=file.content_type,
absolute_path=absolute_path,
relative_path=relative_path,
uploader_id=id,
)
user.avatar = f"/file/{file_record.id}"
await user.save()
result = await file_record.first().values(
id="id",
name="name",
size="size",
file_type="file_type",
relative_path="relative_path",
absolute_path="absolute_path",
uploader_id="uploader__id",
uploader_username="uploader__username",
uploader_nickname="uploader__nickname",
uploader_department_id="uploader__department__id",
uploader_department_name="uploader__department__name",
create_time="create_time",
update_time="update_time",
)
return Response.success(data=result)
return Response.failure(msg="用户不存在!")
@userAPI.put("/resetPassword/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="重置用户密码")
@userAPI.post("/resetPassword/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="重置用户密码")
@Log(title="重置用户密码", business_type=BusinessType.UPDATE)
@Auth(permission_list=["user:btn:reset_password"])
async def reset_user_password(request: Request, params: ResetPasswordParams, id: str = Path(description="用户ID")):
if user := await User.get_or_none(id=id):
user.password = await Password.get_password_hash(params.password)
await user.save()
return Response.success(msg="重置密码成功!")
return Response.failure(msg="用户不存在!")