284 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding : UTF-8 _*_
# @Time : 2025/01/20 20:33
# @UpdateTime : 2025/01/20 20:33
# @Author : sonder
# @File : permission.py
# @Software : PyCharm
# @Comment : 本程序
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth, hasAdmin
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
from models import Permission, RolePermission
from schemas.common import BaseResponse
from schemas.permission import AddPermissionParams, GetPermissionInfoResponse, GetPermissionListResponse
from utils.response import Response
permissionAPI = APIRouter(
prefix="/permission",
dependencies=[Depends(LoginController.get_current_user)]
)
@permissionAPI.post("/add", response_model=BaseResponse, response_class=JSONResponse, summary="新增权限")
@Log(title="新增权限", business_type=BusinessType.INSERT)
@Auth(permission_list=["permission:btn:add"])
async def add_permission(request: Request, params: AddPermissionParams):
permission = await Permission.create(
name=params.name,
parent_id=params.parent_id,
path=params.path,
title=params.title,
menu_type=params.menu_type,
rank=params.rank,
show_link=params.show_link,
show_parent=params.show_parent,
active_path=params.active_path,
component=params.component,
redirect=params.redirect,
frame_src=params.frame_src,
frame_loading=params.frame_loading,
keep_alive=params.keep_alive,
auths=params.auths,
icon=params.icon,
extra_icon=params.extra_icon,
enter_transition=params.enter_transition,
leave_transition=params.leave_transition,
fixed_tag=params.fixed_tag,
hidden_tag=params.hidden_tag,
is_admin=params.is_admin
)
if permission:
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="新增权限成功!")
else:
return Response.error(msg="新增权限失败!")
@permissionAPI.delete("/delete/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="删除权限")
@permissionAPI.post("/delete/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="删除权限")
@Log(title="删除权限", business_type=BusinessType.DELETE)
@Auth(permission_list=["permission:btn:delete"])
async def delete_permission(request: Request, id: str = Path(description="权限ID")):
if permission := await Permission.get_or_none(id=id, del_flag=1):
# 移除角色权限
await delete_permission_recursive(permission_id=permission.id)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="删除权限成功!")
else:
return Response.error(msg="删除权限失败,权限不存在!")
async def delete_permission_recursive(permission_id: str):
"""
递归删除权限及其附属权限
:param permission_id: 权限ID
:return:
"""
await Permission.filter(id=permission_id, del_flag=1).update(del_flag=0)
await RolePermission.filter(permission_id=permission_id, del_flag=1).update(del_flag=0)
sub_permissions = await Permission.filter(parent_id=permission_id, del_flag=1).all()
for sub_department in sub_permissions:
await delete_permission_recursive(sub_department.id)
return True
@permissionAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@permissionAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@Log(title="更新权限", business_type=BusinessType.UPDATE)
@Auth(permission_list=["permission:btn:update"])
async def update_permission(request: Request, params: AddPermissionParams, id: str = Path(description="权限ID"), ):
if permission := await Permission.get_or_none(id=id, del_flag=1):
permission.name = params.name
permission.parent_id = params.parent_id
permission.path = params.path
permission.title = params.title
permission.menu_type = params.menu_type
permission.rank = params.rank
permission.show_link = params.show_link
permission.show_parent = params.show_parent
permission.active_path = params.active_path
permission.component = params.component
permission.redirect = params.redirect
permission.frame_src = params.frame_src
permission.frame_loading = params.frame_loading
permission.keep_alive = params.keep_alive
permission.auths = params.auths
permission.icon = params.icon
permission.extra_icon = params.extra_icon
permission.enter_transition = params.enter_transition
permission.leave_transition = params.leave_transition
permission.fixed_tag = params.fixed_tag
permission.hidden_tag = params.hidden_tag
permission.is_admin = params.is_admin
await permission.save()
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="更新权限成功!")
else:
return Response.error(msg="更新权限失败,权限不存在!")
@permissionAPI.get("/info/{id}", response_model=GetPermissionInfoResponse, response_class=JSONResponse,
summary="查询权限详情")
@Log(title="查询权限详情", business_type=BusinessType.SELECT)
@Auth(permission_list=["permission:btn:info"])
async def get_permission(request: Request, id: str = Path(description="权限ID")):
if permission := await Permission.get_or_none(permission_id=id, del_flag=1):
permission = await permission.first().values(
id="id",
create_by="create_by",
create_time="create_time",
update_by="update_by",
update_time="update_time",
menu_type="menu_type",
parent_id="parent_id",
path="path",
title="title",
name="name",
rank="rank",
redirect="redirect",
component="component",
icon="icon",
extra_icon="extra_icon",
enter_transition="enter_transition",
leave_transition="leave_transition",
active_path="active_path",
auths="auths",
frame_src="frame_src",
frame_loading="frame_loading",
keep_alive="keep_alive",
hidden_tag="hidden_tag",
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(msg="查询权限详情成功!", data=permission)
else:
return Response.error(msg="查询权限详情失败,权限不存在!")
@permissionAPI.get("/list", response_model=GetPermissionListResponse, response_class=JSONResponse,
summary="查询权限列表")
@Log(title="查询权限列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["permission:btn:list"])
async def get_permission_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页条数"),
id: Optional[str] = Query(default=None, description="主键"),
name: Optional[str] = Query(default=None, description="权限名称"),
parentId: Optional[str] = Query(default=None, description="父权限ID"),
path: Optional[str] = Query(default=None, description="权限路径"),
rank: Optional[int] = Query(default=None, description="排序权重"),
menuType: Optional[int] = Query(default=None, description="菜单类型0菜单、1iframe、2外链、3按钮"),
showLink: Optional[bool] = Query(default=None, description="显示菜单"),
showParent: Optional[bool] = Query(default=None, description="显示父级菜单"),
activePath: Optional[str] = Query(default=None, description="激活路径"),
component: Optional[str] = Query(default=None, description="组件路径"),
redirect: Optional[str] = Query(default=None, description="重定向路径"),
frameSrc: Optional[str] = Query(default=None, description="iframe路径"),
frameLoading: Optional[bool] = Query(default=None, description="iframe加载动画"),
keepAlive: Optional[bool] = Query(default=None, description="缓存组件"),
auths: Optional[str] = Query(default=None, description="权限标识"),
icon: Optional[str] = Query(default=None, description="菜单图标"),
extraIcon: Optional[str] = Query(default=None, description="右侧图标"),
enterTransition: Optional[str] = Query(default=None, description="进场动画"),
leaveTransition: Optional[str] = Query(default=None, description="离场动画"),
fixedTag: Optional[bool] = Query(default=None, description="固定标签页"),
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页"),
isAdmin: Optional[bool] = Query(default=None, description="是否为管理专属页面"),
current_user: dict = Depends(LoginController.get_current_user),
):
filterArgs = {
f'{k}__icontains': v for k, v in {
"id": id,
"name": name,
"parent_id": parentId,
"path": path,
"rank": rank,
"menu_type": menuType,
"show_link": showLink,
"show_parent": showParent,
"active_path": activePath,
"component": component,
"redirect": redirect,
"frame_src": frameSrc,
"frame_loading": frameLoading,
"keep_alive": keepAlive,
"auths": auths,
"icon": icon,
"extra_icon": extraIcon,
"enter_transition": enterTransition,
"leave_transition": leaveTransition,
"fixed_tag": fixedTag,
"hidden_tag": hiddenTag,
"is_admin": isAdmin
}.items() if v
}
department_id = current_user.get("department_id", "")
if not await hasAdmin(request, department_id):
filterArgs["is_admin"] = False
total = await Permission.filter(**filterArgs, del_flag=1).count()
result = await Permission.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).order_by(
'rank').values(
id="id",
create_by="create_by",
create_time="create_time",
update_by="update_by",
update_time="update_time",
menu_type="menu_type",
parent_id="parent_id",
title="title",
name="name",
path="path",
component="component",
rank="rank",
redirect="redirect",
icon="icon",
extra_icon="extra_icon",
enter_transition="enter_transition",
leave_transition="leave_transition",
active_path="active_path",
auths="auths",
frame_src="frame_src",
frame_loading="frame_loading",
keep_alive="keep_alive",
hidden_tag="hidden_tag",
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(data={
"total": total,
"result": result,
"page": page,
})