334 lines
14 KiB
Python
334 lines
14 KiB
Python
# _*_ coding : UTF-8 _*_
|
||
# @Time : 2025/01/20 22:41
|
||
# @UpdateTime : 2025/01/20 22:41
|
||
# @Author : sonder
|
||
# @File : role.py
|
||
# @Software : PyCharm
|
||
# @Comment : 本程序
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, Depends, Path, Query, Request
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from annotation.log import Log
|
||
from config.constant import BusinessType
|
||
from controller.login import LoginController
|
||
from models import Role, Permission, RolePermission, Department
|
||
from schemas.common import BaseResponse
|
||
from schemas.role import AddRoleParams, AddRolePermissionParams, GetRolePermissionInfoResponse, \
|
||
GetRolePermissionListResponse
|
||
from utils.common import filterKeyValues
|
||
from utils.response import Response
|
||
|
||
roleAPI = APIRouter(
|
||
prefix="/role",
|
||
dependencies=[Depends(LoginController.get_current_user)]
|
||
)
|
||
|
||
|
||
@roleAPI.post("/add", response_model=BaseResponse, response_class=JSONResponse, summary="新增角色")
|
||
@Log(title="新增角色", business_type=BusinessType.INSERT)
|
||
async def add_role(request: Request, params: AddRoleParams):
|
||
if await Role.get_or_none(code=params.role_code, department_id=params.department_id, del_flag=1):
|
||
return Response.error(msg="角色编码已存在!")
|
||
department = await Department.get_or_none(id=params.department_id, del_flag=1)
|
||
if department:
|
||
role = await Role.create(
|
||
code=params.code,
|
||
name=params.name,
|
||
description=params.description,
|
||
status=params.status,
|
||
department_id=department.id,
|
||
)
|
||
else:
|
||
role = await Role.create(
|
||
code=params.role_code,
|
||
name=params.role_name,
|
||
status=params.status,
|
||
description=params.role_description,
|
||
department_id=None,
|
||
)
|
||
if role:
|
||
return Response.success(msg="新增角色成功!")
|
||
return Response.error(msg="新增角色失败!")
|
||
|
||
|
||
@roleAPI.delete("/delete/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="删除角色")
|
||
@roleAPI.post("/delete/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="删除角色")
|
||
@Log(title="删除角色", business_type=BusinessType.DELETE)
|
||
async def delete_role(request: Request, id: int = Path(..., description="角色ID")):
|
||
if role := await Role.get_or_none(id=id, del_flag=1):
|
||
# 移除相应角色权限
|
||
await RolePermission.filter(role_id=role.id).delete()
|
||
await role.delete()
|
||
return Response.success(msg="删除角色成功!")
|
||
return Response.error(msg="删除角色失败!")
|
||
|
||
|
||
@roleAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="修改角色")
|
||
@roleAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="修改角色")
|
||
@Log(title="修改角色", business_type=BusinessType.UPDATE)
|
||
async def update_role(request: Request, params: AddRoleParams, id: str = Path(..., description="角色ID")):
|
||
if role := await Role.get_or_none(id=id, del_flag=1):
|
||
role.code = params.code
|
||
role.name = params.name
|
||
role.description = params.description
|
||
department = await Department.get_or_none(id=params.department_id, del_flag=1)
|
||
role.status = params.status
|
||
if department:
|
||
role.department_id = department.id
|
||
else:
|
||
role.department_id = None
|
||
await role.save()
|
||
return Response.success(msg="修改角色成功!")
|
||
return Response.error(msg="修改角色失败!")
|
||
|
||
|
||
@roleAPI.get("/info/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="查询角色详情")
|
||
@Log(title="查询角色详情", business_type=BusinessType.SELECT)
|
||
async def get_role_info(request: Request, id: int = Path(..., description="角色ID")):
|
||
if role := await Role.get_or_none(id=id, del_flag=1).values(
|
||
id="id",
|
||
create_by="create_by",
|
||
create_time="create_time",
|
||
update_by="update_by",
|
||
update_time="update_time",
|
||
code="code",
|
||
name="name",
|
||
status="status",
|
||
description="description",
|
||
department_id="department_id",
|
||
department_name="department__name",
|
||
department_principal="department__principal",
|
||
department_phone="department__phone",
|
||
department_email="department__email",
|
||
):
|
||
return Response.success(data=role)
|
||
return Response.error(msg="查询角色详情失败!")
|
||
|
||
|
||
@roleAPI.get("/list", response_model=BaseResponse, response_class=JSONResponse, summary="查询角色列表")
|
||
@Log(title="查询角色列表", business_type=BusinessType.SELECT)
|
||
async def get_role_list(
|
||
request: Request,
|
||
page: int = Query(1, description="页码"),
|
||
pageSize: int = Query(10, description="每页数量"),
|
||
name: Optional[str] = Query(None, description="角色名称"),
|
||
code: Optional[str] = Query(None, description="角色编码"),
|
||
description: Optional[str] = Query(None, description="角色描述"),
|
||
department_id: Optional[str] = Query(None, description="所属部门ID"),
|
||
status: Optional[int] = Query(None, description="状态"),
|
||
current_user: dict = Depends(LoginController.get_current_user)
|
||
):
|
||
filterArgs = {
|
||
f'{k}__contains': v for k, v in {
|
||
"name": name,
|
||
"code": code,
|
||
"description": description,
|
||
"status": status
|
||
}.items() if v
|
||
}
|
||
# 如果未提供 department_id,则使用当前用户的部门 ID
|
||
if not department_id:
|
||
department_id = current_user.get("department_id")
|
||
|
||
# 查询当前部门及其下属部门的角色
|
||
all_roles = await get_role_and_subroles(department_id, filterArgs)
|
||
|
||
# 分页处理
|
||
total = len(all_roles)
|
||
paginated_roles = all_roles[(page - 1) * pageSize: page * pageSize]
|
||
|
||
return Response.success(data={
|
||
"result": paginated_roles,
|
||
"total": total,
|
||
"page": page
|
||
})
|
||
|
||
|
||
async def get_department_and_subdepartments(department_id: str, visited: set = None):
|
||
"""
|
||
递归查询当前部门及其所有下属部门的 ID。
|
||
|
||
:param department_id: 当前部门 ID
|
||
:param visited: 已访问的部门 ID 集合,用于避免循环依赖
|
||
:return: 部门 ID 列表
|
||
"""
|
||
if visited is None:
|
||
visited = set() # 初始化已访问的部门 ID 集合
|
||
|
||
# 如果当前部门 ID 已经访问过,直接返回空列表,避免死循环
|
||
if department_id in visited:
|
||
return []
|
||
|
||
visited.add(department_id) # 标记当前部门 ID 为已访问
|
||
|
||
# 查询当前部门
|
||
current_department = await Department.filter(
|
||
id=department_id
|
||
).values_list("id", flat=True)
|
||
|
||
# 查询直接子部门
|
||
sub_departments = await Department.filter(
|
||
parent_id=department_id
|
||
).values_list("id", flat=True)
|
||
|
||
# 递归查询子部门的子部门
|
||
for sub_department_id in sub_departments[:]: # 使用切片复制避免修改迭代中的列表
|
||
sub_sub_departments = await get_department_and_subdepartments(sub_department_id, visited)
|
||
sub_departments.extend(sub_sub_departments)
|
||
|
||
# 合并当前部门和所有下属部门的 ID
|
||
return current_department + sub_departments
|
||
|
||
|
||
async def get_role_and_subroles(department_id: str, filterArgs: dict):
|
||
"""
|
||
查询当前部门及其下属部门的角色。
|
||
|
||
:param department_id: 当前部门 ID
|
||
:param filterArgs: 过滤条件
|
||
:return: 角色列表
|
||
"""
|
||
# 递归查询当前部门及其下属部门的 ID
|
||
department_ids = await get_department_and_subdepartments(department_id)
|
||
|
||
# 查询这些部门的角色
|
||
roles = await Role.filter(
|
||
department__id__in=department_ids,
|
||
**filterArgs
|
||
).values(
|
||
id="id",
|
||
create_by="create_by",
|
||
create_time="create_time",
|
||
update_by="update_by",
|
||
update_time="update_time",
|
||
code="code",
|
||
name="name",
|
||
status="status",
|
||
description="description",
|
||
department_id="department__id",
|
||
department_name="department__name",
|
||
department_principal="department__principal",
|
||
department_phone="department__phone",
|
||
department_email="department__email",
|
||
)
|
||
|
||
# 根据 id 去重
|
||
unique_roles = []
|
||
seen_ids = set() # 用于记录已经处理过的角色 ID
|
||
for role in roles:
|
||
if role["id"] not in seen_ids:
|
||
unique_roles.append(role)
|
||
seen_ids.add(role["id"])
|
||
|
||
return unique_roles
|
||
|
||
|
||
@roleAPI.post("/addPermission", response_model=BaseResponse, response_class=JSONResponse, summary="新增角色权限")
|
||
@Log(title="新增角色权限", business_type=BusinessType.INSERT)
|
||
async def add_role_permission(request: Request, params: AddRolePermissionParams,
|
||
id: str = Path(..., description="角色ID")):
|
||
if role := await Role.get_or_none(id=id, del_flag=1):
|
||
# 已有角色权限
|
||
rolePermissions = await RolePermission.filter(role_id=id).all().values("permission_id")
|
||
rolePermissions = await filterKeyValues(rolePermissions, "permission_id")
|
||
# 利用集合筛选出角色权限中不存在的权限
|
||
add_list = set(params.permission_ids).difference(set(rolePermissions))
|
||
# 循环添加角色权限
|
||
for item in add_list:
|
||
permission = await Permission.get_or_none(id=item, del_flag=1)
|
||
if permission:
|
||
await RolePermission.create(
|
||
role_id=role.id,
|
||
permission_id=permission.id
|
||
)
|
||
return Response.success(msg="新增角色权限成功!")
|
||
return Response.error(msg="新增角色权限失败!")
|
||
|
||
|
||
@roleAPI.delete("/deletePermission/{id}", response_model=BaseResponse, response_class=JSONResponse,
|
||
summary="删除角色权限")
|
||
@roleAPI.post("/deletePermission/{id}", response_model=BaseResponse, response_class=JSONResponse,
|
||
summary="删除角色权限")
|
||
@Log(title="删除角色权限", business_type=BusinessType.DELETE)
|
||
async def delete_role_permission(request: Request, id: int = Path(..., description="角色权限ID")):
|
||
if rolePermission := await RolePermission.get_or_none(id=id, del_flag=1):
|
||
await rolePermission.delete()
|
||
return Response.success(msg="删除角色权限成功!")
|
||
return Response.error(msg="删除角色权限失败!")
|
||
|
||
|
||
@roleAPI.put("/updatePermission/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="修改角色权限")
|
||
@roleAPI.post("/updatePermission/{id}", response_model=BaseResponse, response_class=JSONResponse,
|
||
summary="修改角色权限")
|
||
@Log(title="修改角色权限", business_type=BusinessType.UPDATE)
|
||
async def update_role_permission(request: Request, params: AddRolePermissionParams,
|
||
id: str = Path(..., description="角色ID")):
|
||
if role := await Role.get_or_none(id=id, del_flag=1):
|
||
# 已有角色权限
|
||
rolePermissions = await RolePermission.filter(role_id=role.id).all().values("permission_id")
|
||
rolePermissions = await filterKeyValues(rolePermissions, "permission_id")
|
||
# 利用集合筛选出角色权限中不存在的权限
|
||
delete_list = set(rolePermissions).difference(set(params.permission_ids))
|
||
# 利用集合筛选出角色权限中新增的权限
|
||
add_list = set(params.permission_ids).difference(set(rolePermissions))
|
||
# 循环删除角色权限
|
||
for item in delete_list:
|
||
await RolePermission.filter(role_id=id, permission_id=item).delete()
|
||
# 循环添加角色权限
|
||
for item in add_list:
|
||
await RolePermission.create(role_id=id, permission_id=item)
|
||
return Response.success(msg="修改角色权限成功!")
|
||
return Response.error(msg="修改角色权限失败!")
|
||
|
||
|
||
@roleAPI.get("/permissionInfo/{id}", response_model=GetRolePermissionInfoResponse, response_class=JSONResponse,
|
||
summary="获取角色权限信息")
|
||
@Log(title="获取角色权限信息", business_type=BusinessType.SELECT)
|
||
async def get_role_permission_info(request: Request, id: int = Path(..., description="角色权限ID")):
|
||
if rolePermission := await RolePermission.get_or_none(id=id, del_flag=1):
|
||
data = await rolePermission.first().values(
|
||
id="id",
|
||
create_by="create_by",
|
||
create_time="create_time",
|
||
update_by="update_by",
|
||
update_time="update_time",
|
||
role_id="role__id",
|
||
role_name="role__name",
|
||
role_code="role__code",
|
||
permission_id="permission__id",
|
||
permission_name="permission__title",
|
||
permission_auth="permission__auths",
|
||
permission_type="permission__menu_type"
|
||
)
|
||
return Response.success(data=data)
|
||
return Response.error(msg="获取角色权限信息失败!")
|
||
|
||
|
||
@roleAPI.get("/permissionList/{id}", response_model=GetRolePermissionListResponse, response_class=JSONResponse,
|
||
summary="获取角色权限列表")
|
||
@Log(title="获取角色权限列表", business_type=BusinessType.SELECT)
|
||
async def get_role_permission_list(request: Request, id: str = Path(..., description="角色ID")):
|
||
total = await RolePermission.filter(role_id=id).count()
|
||
data = await RolePermission.filter(role_id=id).values(
|
||
id="id",
|
||
create_by="create_by",
|
||
create_time="create_time",
|
||
update_by="update_by",
|
||
update_time="update_time",
|
||
role_id="role__id",
|
||
role_name="role__name",
|
||
role_code="role__code",
|
||
permission_id="permission__id",
|
||
permission_parent_id="permission__parent_id",
|
||
permission_name="permission__title",
|
||
permission_auth="permission__auths",
|
||
permission_type="permission__menu_type"
|
||
)
|
||
return Response.success(data={
|
||
"result": data,
|
||
"total": total,
|
||
"page": 1
|
||
})
|