70 lines
2.1 KiB
Python
Raw Permalink Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/26 16:01
# @UpdateTime : 2025/01/26 16:01
# @Author : sonder
# @File : auth.py
# @Software : PyCharm
# @Comment : 本程序为权限装饰器定义
from functools import wraps
from fastapi import Request
from config.constant import RedisKeyConfig
2025-02-13 02:27:44 +08:00
from controller.login import LoginController
from exceptions.exception import PermissionException
class Auth:
"""
权限装饰器
"""
def __init__(self, permission_list: list):
"""
权限装饰器
:param permission_list: 权限列表
"""
self.permission_list = permission_list
def __call__(self, func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
# 获取上下文信息
token = request.headers.get('Authorization') # 直接使用 request 对象
current_user = await LoginController.get_current_user(request, token)
permissions = current_user.get('permissions')
for permission in set(permissions):
if permission in self.permission_list:
# 如果用户有权限,继续执行接口逻辑
return await func(request, *args, **kwargs)
# 如果用户没有权限,返回错误信息
raise PermissionException(message="该用户无此接口权限!")
return wrapper
async def hasAuth(request: Request, permission: str) -> bool:
"""
判断是有拥有某项权限
"""
token = request.headers.get('Authorization') # 直接使用 request 对象
current_user = await LoginController.get_current_user(request, token)
permissions = current_user.get('permissions')
if permission in permissions:
return True
else:
return False
async def hasAdmin(request: Request, department_id: str) -> bool:
"""
判断是否有管理员权限
"""
permissions = []
if ids := await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:permission_departments'):
permissions = eval(ids)
if department_id in permissions:
return True
else:
return False