133 lines
4.6 KiB
Python
Raw Normal View History

2025-02-13 02:27:44 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/23 21:22
# @UpdateTime : 2025/01/23 21:22
# @Author : sonder
# @File : query.py
# @Software : PyCharm
# @Comment : 本程序
from typing import Union
from tortoise.expressions import Q
from models import User, UserRole, RolePermission
from utils.common import filterKeyValues
class QueryController:
"""
数据库查询控制器
"""
@classmethod
async def get_user_by_username(cls, username: str) -> Union[User, None]:
"""
根据用户名获取用户信息
:param username:用户名|手机号|邮箱
:return:
"""
return await User.get_or_none(Q(username=username) | Q(email=username) | Q(phone=username), del_flag=1)
@classmethod
async def get_user_by_id(cls, user_id: str) -> Union[User, None]:
"""
:param user_id:
:return:
"""
return await User.get_or_none(id=user_id, del_flag=1)
@classmethod
async def get_user_info(cls, user_id: str) -> Union[dict, None]:
"""
:param user_id:用户ID
"""
# 获取用户信息
userInfo = await User.get_or_none(id=user_id, del_flag=1).values(
id="id",
username="username",
phone="phone",
email="email",
avatar="avatar",
nickname="nickname",
gender="gender",
status="status",
create_time="create_time",
update_time="update_time",
department_id="department__id",
department_name="department__name"
)
# 获取用户角色
userRoles = await UserRole.filter(user_id=user_id, del_flag=1).values(
role_id="role__id",
role_name="role__name",
role_code="role__code"
)
# 获取用户角色标识
userRole = await filterKeyValues(userRoles, "role_code")
# 获取用户角色ID
userRoleIds = await filterKeyValues(userRoles, "role_id")
# 根据用户角色ID获取用户权限
permissions = []
for item in userRoleIds:
permission = await RolePermission.filter(role_id=item, del_flag=1).values(
permission_id="permission__id",
permission_name="permission__name",
permission_auths="permission__auths"
)
permissions.extend(permission)
permissions = await filterKeyValues(permissions, "permission_auths")
permissions = list(set(permissions))
userInfo["roles"] = userRole
userInfo["permissions"] = permissions
return userInfo
@classmethod
async def register_user_before(cls, username: str, phone: str, email: str) -> Union[User, None]:
"""
注册用户前检查用户名手机号邮箱是否已存在
:param phone:
:param username:
:param email:
:return:
"""
return await User.get_or_none(Q(username=username) | Q(email=email) | Q(phone=phone), del_flag=1)
@classmethod
async def get_user_permissions(cls, user_id: str) -> Union[list, None]:
"""
获取用户权限
"""
# 获取用户角色
userRoles = await UserRole.filter(user_id=user_id, del_flag=1).values(
role_id="role__id",
role_name="role__name",
role_code="role__code"
)
# 获取用户角色ID
userRoleIds = await filterKeyValues(userRoles, "role_id")
# 根据用户角色ID获取用户权限
permissions = []
for item in userRoleIds:
permission = await RolePermission.filter(role_id=item, del_flag=1).values(
id="permission__id",
parentId="permission__parent_id",
name="permission__name",
title="permission__title",
type="permission__menu_type",
path="permission__path",
component="permission__component",
redirect="permission__redirect",
rank="permission__rank",
icon="permission__icon",
extraIcon="permission__extra_icon",
enterTransition="permission__enter_transition",
leaveTransition="permission__leave_transition",
activePath="permission__active_path",
auths="permission__auths",
keepAlive="permission__keep_alive",
hiddenTag="permission__hidden_tag",
showLink="permission__show_link",
showParent="permission__show_parent"
)
permissions.extend(permission)
return permissions