2025-02-13 02:27:44 +08:00
|
|
|
# _*_ coding : UTF-8 _*_
|
|
|
|
# @Time : 2025/01/26 16:01
|
|
|
|
# @UpdateTime : 2025/01/26 16:01
|
|
|
|
# @Author : sonder
|
|
|
|
# @File : auth.py
|
|
|
|
# @Software : PyCharm
|
|
|
|
# @Comment : 本程序为权限装饰器定义
|
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
from fastapi import Request
|
|
|
|
|
2025-02-26 23:00:24 +08:00
|
|
|
from config.constant import RedisKeyConfig
|
2025-02-13 02:27:44 +08:00
|
|
|
from controller.login import LoginController
|
|
|
|
from exceptions.exception import PermissionException
|
|
|
|
|
|
|
|
|
|
|
|
class Auth:
|
|
|
|
"""
|
|
|
|
权限装饰器
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, permission_list: list):
|
|
|
|
"""
|
|
|
|
权限装饰器
|
|
|
|
:param permission_list: 权限列表
|
|
|
|
"""
|
|
|
|
self.permission_list = permission_list
|
|
|
|
|
|
|
|
def __call__(self, func):
|
|
|
|
@wraps(func)
|
|
|
|
async def wrapper(request: Request, *args, **kwargs):
|
|
|
|
# 获取上下文信息
|
|
|
|
token = request.headers.get('Authorization') # 直接使用 request 对象
|
|
|
|
current_user = await LoginController.get_current_user(request, token)
|
|
|
|
permissions = current_user.get('permissions')
|
|
|
|
for permission in set(permissions):
|
|
|
|
if permission in self.permission_list:
|
|
|
|
# 如果用户有权限,继续执行接口逻辑
|
|
|
|
return await func(request, *args, **kwargs)
|
|
|
|
# 如果用户没有权限,返回错误信息
|
|
|
|
raise PermissionException(message="该用户无此接口权限!")
|
|
|
|
|
|
|
|
return wrapper
|
2025-02-24 18:33:22 +08:00
|
|
|
|
|
|
|
|
|
|
|
async def hasAuth(request: Request, permission: str) -> bool:
|
|
|
|
"""
|
|
|
|
判断是有拥有某项权限
|
|
|
|
"""
|
|
|
|
token = request.headers.get('Authorization') # 直接使用 request 对象
|
|
|
|
current_user = await LoginController.get_current_user(request, token)
|
|
|
|
permissions = current_user.get('permissions')
|
|
|
|
if permission in permissions:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|
2025-02-26 23:00:24 +08:00
|
|
|
|
|
|
|
|
|
|
|
async def hasAdmin(request: Request, department_id: str) -> bool:
|
|
|
|
"""
|
|
|
|
判断是否有管理员权限
|
|
|
|
"""
|
|
|
|
permissions = []
|
|
|
|
if ids := await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:permission_departments'):
|
|
|
|
permissions = eval(ids)
|
|
|
|
if department_id in permissions:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
return False
|