154 lines
5.7 KiB
Python
Raw Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/23 21:22
# @UpdateTime : 2025/01/23 21:22
# @Author : sonder
# @File : query.py
# @Software : PyCharm
# @Comment : 本程序
from typing import Union
from tortoise.expressions import Q
from models import User, UserRole, RolePermission, Department
2025-02-13 02:27:44 +08:00
from utils.common import filterKeyValues
class QueryController:
"""
数据库查询控制器
"""
@classmethod
async def get_user_by_username(cls, username: str) -> Union[User, None]:
"""
根据用户名获取用户信息
:param username:用户名|手机号|邮箱
:return:
"""
return await User.get_or_none(Q(username=username) | Q(email=username) | Q(phone=username), del_flag=1)
@classmethod
async def get_user_by_id(cls, user_id: str) -> Union[User, None]:
"""
:param user_id:
:return:
"""
return await User.get_or_none(id=user_id, del_flag=1)
@classmethod
async def get_user_info(cls, user_id: str) -> Union[dict, None]:
"""
:param user_id:用户ID
"""
# 获取用户信息
userInfo = await User.get_or_none(id=user_id, del_flag=1).values(
id="id",
username="username",
phone="phone",
email="email",
avatar="avatar",
nickname="nickname",
gender="gender",
status="status",
create_time="create_time",
update_time="update_time",
department_id="department__id",
department_name="department__name"
)
# 获取用户角色
userRoles = await UserRole.filter(user_id=user_id, del_flag=1).values(
role_id="role__id",
role_name="role__name",
role_code="role__code"
)
# 获取用户角色标识
userRole = await filterKeyValues(userRoles, "role_code")
# 获取用户角色ID
userRoleIds = await filterKeyValues(userRoles, "role_id")
# 获取用户下属部门
subDepartments = await cls.get_sub_department_ids(department_id=userInfo['department_id'])
2025-02-13 02:27:44 +08:00
# 根据用户角色ID获取用户权限
permissions = []
for item in userRoleIds:
permission = await RolePermission.filter(role_id=item, del_flag=1).values(
permission_id="permission__id",
permission_name="permission__name",
permission_auths="permission__auths"
)
permissions.extend(permission)
permissions = await filterKeyValues(permissions, "permission_auths")
permissions = list(set(permissions))
userInfo["roles"] = userRole
userInfo["permissions"] = permissions
userInfo["sub_departments"] = subDepartments
2025-02-13 02:27:44 +08:00
return userInfo
@classmethod
async def register_user_before(cls, username: str, phone: str, email: str) -> Union[User, None]:
"""
注册用户前检查用户名手机号邮箱是否已存在
:param phone:
:param username:
:param email:
:return:
"""
return await User.get_or_none(Q(username=username) | Q(email=email) | Q(phone=phone), del_flag=1)
@classmethod
async def get_user_permissions(cls, user_id: str, sub_departments: list = []) -> Union[list, None]:
2025-02-13 02:27:44 +08:00
"""
获取用户权限
"""
# 获取用户角色
userRoles = await UserRole.filter(user_id=user_id, del_flag=1, user__department__id__in=sub_departments).values(
2025-02-13 02:27:44 +08:00
role_id="role__id",
role_name="role__name",
role_code="role__code"
)
# 获取用户角色ID
userRoleIds = await filterKeyValues(userRoles, "role_id")
# 根据用户角色ID获取用户权限
permissions = []
for item in userRoleIds:
permission = await RolePermission.filter(role_id=item, del_flag=1).values(
id="permission__id",
parentId="permission__parent_id",
name="permission__name",
title="permission__title",
type="permission__menu_type",
path="permission__path",
component="permission__component",
redirect="permission__redirect",
rank="permission__rank",
icon="permission__icon",
extraIcon="permission__extra_icon",
enterTransition="permission__enter_transition",
leaveTransition="permission__leave_transition",
activePath="permission__active_path",
auths="permission__auths",
2025-02-15 23:36:20 +08:00
frameSrc="permission__frame_src",
frameLoading="permission__frame_loading",
fixedTag="permission__fixed_tag",
2025-02-13 02:27:44 +08:00
keepAlive="permission__keep_alive",
hiddenTag="permission__hidden_tag",
showLink="permission__show_link",
showParent="permission__show_parent",
isAdmin="permission__is_admin",
2025-02-13 02:27:44 +08:00
)
permissions.extend(permission)
return permissions
@classmethod
async def get_sub_department_ids(cls, department_id: str) -> list:
# 递归获取指定部门及其所有下属部门的 ID
async def fetch_sub_deps(dep_id: str):
sub_deps = await Department.filter(parent_id=dep_id, del_flag=1).all()
sub_deps_list = [dep.id for dep in sub_deps]
for sub_dep in sub_deps:
sub_deps_list.extend(await fetch_sub_deps(sub_dep.id)) # 递归获取下属部门
return sub_deps_list
dataList = await fetch_sub_deps(department_id)
dataList.append(department_id)
return list(set(dataList))