613 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding : UTF-8 _*_
# @Time : 2025/01/20 00:31
# @UpdateTime : 2025/01/20 00:31
# @Author : sonder
# @File : user.py
# @Software : PyCharm
# @Comment : 本程序
import calendar
import os
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request, Form
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from config.env import UploadConfig
from controller.login import LoginController
from controller.query import QueryController
from exceptions.exception import ModelValidatorException
from models import File as FileModel
from models import Role, Department, OperationLog, LoginLog
from models.code import QueryCode
from models.user import User, UserRole
from schemas.common import BaseResponse, DeleteListParams
from schemas.department import GetDepartmentListResponse
from schemas.file import UploadFileResponse
from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \
AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, \
ResetPasswordParams, GetUserStatisticsResponse, UpdateBaseUserInfoParams
from utils.common import filterKeyValues
from utils.password import Password
from utils.response import Response
userAPI = APIRouter(prefix="/user")
@userAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增用户")
@Log(title="新增用户", business_type=BusinessType.INSERT)
@Auth(["user:btn:addUser"])
async def add_user(
request: Request,
params: AddUserParams,
current_user: dict = Depends(LoginController.get_current_user)
):
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="添加失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
department = await Department.get_or_none(id=params.department_id, del_flag=1)
user = await User.create(
username=params.username,
password=params.password,
nickname=params.nickname,
phone=params.phone,
email=params.email,
gender=params.gender,
department=department,
status=params.status,
)
if user:
return Response.success(msg="添加成功!")
else:
return Response.error(msg="添加失败!")
@userAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除用户")
@userAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除用户")
@Log(title="删除用户", business_type=BusinessType.DELETE)
@Auth(["user:btn:deleteUser"])
async def delete_user(
request: Request,
id: str = Path(..., description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
user.del_flag = 0
await user.save()
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(msg="删除成功!")
else:
return Response.error(msg="删除失败,用户不存在!")
@userAPI.delete("/deleteUserList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除用户")
@userAPI.post("/deleteUserList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除用户")
@Log(title="批量删除用户", business_type=BusinessType.DELETE)
@Auth(["user:btn:deleteUser"])
async def delete_user_list(
request: Request,
params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
for id in params.ids:
if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
user.del_flag = 0
await user.save()
return Response.success(msg="删除成功!")
@userAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新用户")
@userAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新用户")
@Log(title="更新用户", business_type=BusinessType.UPDATE)
@Auth(["user:btn:updateUser"])
async def update_user(
request: Request,
params: UpdateUserParams,
id: str = Path(..., description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
user.username = params.username
user.nickname = params.nickname
user.phone = params.phone
user.email = params.email
user.gender = params.gender
user.status = params.status
if department := await Department.get_or_none(id=params.department_id, del_flag=1):
user.department = department
else:
user.department = None
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
return Response.success(msg="更新成功!")
else:
return Response.error(msg="更新失败,用户不存在!")
@userAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetUserInfoResponse, summary="获取用户信息")
@Log(title="获取用户信息", business_type=BusinessType.SELECT)
@Auth(["user:btn:Userinfo"])
async def get_user_info(request: Request, id: str = Path(..., description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=id, del_flag=1):
user = await user.first().values(
id="id",
create_time="create_time",
update_time="update_time",
username="username",
email="email",
phone="phone",
nickname="nickname",
gender="gender",
status="status",
avatar="avatar",
department_id="department__id",
)
return Response.success(data=user)
else:
return Response.error(msg="用户不存在!")
@userAPI.get("/list", response_class=JSONResponse, response_model=GetUserListResponse, summary="获取用户列表")
@Log(title="获取用户列表", business_type=BusinessType.SELECT)
@Auth(["user:btn:queryUser"])
async def get_user_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
username: Optional[str] = Query(default=None, description="用户名"),
nickname: Optional[str] = Query(default=None, description="昵称"),
phone: Optional[str] = Query(default=None, description="手机号"),
email: Optional[str] = Query(default=None, description="邮箱"),
gender: Optional[str] = Query(default=None, description="性别"),
status: Optional[str] = Query(default=None, description="状态"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'username': username,
'nickname': nickname,
'phone': phone,
'email': email,
'gender': gender,
'status': status,
'department_id': department_id
}.items() if v
}
if not department_id:
filterArgs['department_id__in'] = sub_departments
total = await User.filter(**filterArgs, del_flag=1).count()
result = await User.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
username="username",
email="email",
phone="phone",
nickname="nickname",
avatar="avatar",
gender="gender",
status="status",
department_id="department__id",
)
return Response.success(data={
"result": result,
"total": total,
"page": page,
"pageSize": pageSize
})
@userAPI.post("/addRole", response_model=BaseResponse, response_class=JSONResponse, summary="添加用户角色")
@Log(title="添加用户角色", business_type=BusinessType.INSERT)
@Auth(["user:btn:addRole"])
async def add_user_role(request: Request, params: AddUserRoleParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if await UserRole.get_or_none(user_id=params.user_id, role_id=params.role_id, del_flag=1,
user__department__id__in=sub_departments):
return Response.error(msg="该用户已存在该角色!")
if user := await User.get_or_none(id=params.user_id, del_flag=1, department__id__in=sub_departments):
if role := await Role.get_or_none(id=params.role_id, del_flag=1, department__id__in=sub_departments):
userRole = await UserRole.create(user_id=user.id, role_id=role.id)
if userRole:
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="添加成功!")
else:
return Response.error(msg="添加失败!")
else:
return Response.error(msg="添加失败,角色不存在!")
else:
return Response.error(msg="添加失败,用户不存在!")
@userAPI.delete("/deleteRole/{id}", response_model=BaseResponse, response_class=JSONResponse,
summary="删除用户角色")
@userAPI.post("/deleteRole/{id}", response_model=BaseResponse, response_class=JSONResponse,
summary="删除用户角色")
@Log(title="删除用户角色", business_type=BusinessType.DELETE)
@Auth(["user:btn:deleteRole"])
async def delete_user_role(request: Request, id: str = Path(description="用户角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if userRole := await UserRole.get_or_none(id=id, del_flag=1, user__department__id__in=sub_departments):
userRole.del_flag = 0
await userRole.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{current_user.get("id")}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{current_user.get("id")}')
return Response.success(msg="删除成功!")
else:
return Response.error(msg="删除失败,用户角色不存在!")
@userAPI.put("/updateRole", response_model=BaseResponse, response_class=JSONResponse, summary="修改用户角色")
@userAPI.post("/updateRole", response_model=BaseResponse, response_class=JSONResponse,
summary="修改用户角色")
@Log(title="修改用户角色", business_type=BusinessType.UPDATE)
@Auth(["user:btn:updateRole"])
async def update_user_role(request: Request, params: UpdateUserRoleParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
# 获取用户已有角色
userRoles = await UserRole.filter(user_id=params.user_id, del_flag=1,
user__department__id__in=sub_departments).values("role_id")
userRoles = await filterKeyValues(userRoles, "role_id")
# 利用集合找到需要添加的角色
addRoles = set(params.role_ids).difference(set(userRoles))
# 利用集合找到需要删除的角色
deleteRoles = set(userRoles).difference(set(params.role_ids))
# 添加角色
for role_id in addRoles:
if role := await Role.get_or_none(id=role_id, del_flag=1, department__id__in=sub_departments):
await UserRole.create(user_id=params.user_id, role_id=role.id)
# 删除角色
for role_id in deleteRoles:
if userRole := await UserRole.get_or_none(user_id=params.user_id, role_id=role_id, del_flag=1,
user__department__id__in=sub_departments):
userRole.del_flag = 0
await userRole.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{params.user_id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{params.user_id}')
return Response.success(msg="修改成功!")
@userAPI.get("/roleInfo/{id}", response_model=GetUserRoleInfoResponse, response_class=JSONResponse,
summary="获取用户角色信息")
@Log(title="获取用户角色信息", business_type=BusinessType.SELECT)
@Auth(["user:btn:roleInfo"])
async def get_user_role_info(request: Request, id: str = Path(description="用户角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if userRole := await UserRole.get_or_none(id=id, del_flag=1, user__department__id__in=sub_departments):
data = await userRole.first().values(
id="id",
user_id="user__id",
user_name="user__username",
role_name="role__name",
role_code="role__code",
role_id="role__id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data=data)
else:
return Response.error(msg="获取失败,用户角色不存在!")
@userAPI.get("/roleList/{id}", response_model=GetDepartmentListResponse, response_class=JSONResponse,
summary="获取用户角色列表")
@Log(title="获取用户角色列表", business_type=BusinessType.SELECT)
@Auth(["user:btn:roleList"])
async def get_user_role_list(
request: Request,
id: str = Path(description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
result = await UserRole.filter(user_id=id, del_flag=1, user__department__id__in=sub_departments).values(
id="id",
department_id="user__department__id",
department_name="user__department__name",
department_phone="user__department__phone",
department_principal="user__department__principal",
department_email="user__department__email",
role_name="role__name",
role_code="role__code",
role_id="role__id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data={
"result": result,
"total": len(result),
"page": 1,
"pageSize": 10,
})
@userAPI.get("/permissionList/{id}", response_class=JSONResponse, response_model=GetUserPermissionListResponse,
summary="获取用户权限列表")
@Log(title="获取用户权限列表", business_type=BusinessType.SELECT)
@Auth(["user:btn:permissionList"])
async def get_user_permission_list(request: Request, id: str = Path(description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
permissions = await QueryController.get_user_permissions(user_id=id, sub_departments=sub_departments)
permissions = await filterKeyValues(permissions, "id")
# 获取用户角色
return Response.success(data=list(set(permissions)))
@userAPI.post("/avatar/{id}", response_model=UploadFileResponse, response_class=JSONResponse, summary="上传用户头像")
@Log(title="上传用户头像", business_type=BusinessType.UPDATE)
@Auth(["user:btn:uploadAvatar"])
async def upload_user_avatar(
request: Request,
id: str = Path(description="用户ID"),
file: UploadFile = File(...), current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if user := await User.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
image_mimetypes = [
'image/jpeg',
'image/png',
'image/gif',
'image/svg+xml',
'image/bmp',
'image/webp',
'image/tiff'
]
if file.content_type not in image_mimetypes:
raise ModelValidatorException(message="文件类型不支持")
# 2. 生成唯一的文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_filename = f"{id}_{timestamp}"
# 3. 保存文件到服务器
file_path = os.path.join(UploadConfig.UPLOAD_PATH, unique_filename)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# 4. 构建文件的相对路径和绝对路径
relative_path = os.path.join(UploadConfig.UPLOAD_PREFIX, unique_filename) # 相对路径
absolute_path = os.path.abspath(file_path) # 绝对路径
# 5. 将文件信息保存到数据库
file_record = await FileModel.create(
name=file.filename,
size=os.path.getsize(file_path),
file_type=file.content_type,
absolute_path=absolute_path,
relative_path=relative_path,
uploader_id=id,
)
user.avatar = f"/file/{file_record.id}"
await user.save()
result = await file_record.first().values(
id="id",
name="name",
size="size",
file_type="file_type",
relative_path="relative_path",
absolute_path="absolute_path",
uploader_id="uploader__id",
uploader_username="uploader__username",
uploader_nickname="uploader__nickname",
uploader_department_id="uploader__department__id",
uploader_department_name="uploader__department__name",
create_time="create_time",
update_time="update_time",
)
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(data=result)
return Response.failure(msg="用户不存在!")
@userAPI.put("/resetPassword/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="重置用户密码")
@userAPI.post("/resetPassword/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="重置用户密码")
@Log(title="重置用户密码", business_type=BusinessType.UPDATE)
@Auth(permission_list=["user:btn:reset_password"])
async def reset_user_password(request: Request, params: ResetPasswordParams, id: str = Path(description="用户ID"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=id, del_flag=1, department__id__in=current_user.get("sub_departments")):
user.password = await Password.get_password_hash(params.password)
await user.save()
return Response.success(msg="重置密码成功!")
return Response.failure(msg="用户不存在!")
@userAPI.put("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@userAPI.post("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@Log(title="更新基础个人信息", business_type=BusinessType.UPDATE)
async def update_base_userinfo(params: UpdateBaseUserInfoParams, request: Request,
current_user: dict = Depends(LoginController.get_current_user)):
user = await User.get_or_none(id=current_user.get("id"), del_flag=1)
if user:
user.nickname = params.name
user.gender = params.gender
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@userAPI.post("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@Log(title="用户更新密码", business_type=BusinessType.UPDATE)
async def update_user_password(request: Request, oldPassword: str = Form(description="用户旧密码"),
newPassword: str = Form(description="用户新密码"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(oldPassword)
if user.password != password:
return Response.error(msg="旧密码错误!")
newPassword = await Password.get_password_hash(newPassword)
user.password = newPassword
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@userAPI.post("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@Log(title="用户更新手机号", business_type=BusinessType.UPDATE)
async def update_user_phone(request: Request, password: str = Form(description="用户密码"),
phone: str = Form(description="用户手机号"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
phoneStatus = await User.filter(phone=phone,del_flag=1).count()
if phoneStatus:
return Response.error( f"更改失败,手机号:{phone}已绑定其他账号!")
user.phone=phone
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@userAPI.post("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@Log(title="用户更新邮箱", business_type=BusinessType.UPDATE)
async def update_user_email(request: Request, password: str = Form(description="用户密码"),
email: str = Form(description="用户邮箱"),current_user: dict = Depends(LoginController.get_current_user)):
if user:=await User.get_or_none(id=current_user.get("id"),del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
emailStatus = await User.filter(email=email,del_flag=1).count()
if emailStatus:
return Response.error(f"更改失败,邮箱:{email}已绑定其他账号!")
user.email=email
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.get("/statistics", response_model=GetUserStatisticsResponse, response_class=JSONResponse,
summary="获取用户查询统计")
@Log(title="获取用户查询统计", business_type=BusinessType.SELECT)
async def get_user_statistics(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
user_id = current_user.get("id")
# 获取当前时间
now = datetime.now()
# 今日开始时间
today_start_time = now.replace(hour=0, minute=0, second=0, microsecond=0)
# 今日结束时间
today_end_time = now.replace(hour=23, minute=59, second=59, microsecond=999999)
# 当月开始时间
this_month_start_time = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# 计算当月的最后一天
this_month_last_day = calendar.monthrange(now.year, now.month)[1]
# 当月结束时间
this_month_end_time = now.replace(
day=this_month_last_day, hour=23, minute=59, second=59, microsecond=999999
)
# 计算上个月的年和月
if now.month == 1: # 处理1月上月是去年的12月
last_month_year = now.year - 1
last_month = 12
else:
last_month_year = now.year
last_month = now.month - 1
# 上月开始时间
last_month_start_time = datetime(last_month_year, last_month, 1, 0, 0, 0, 0)
# 计算上个月最后一天
last_month_last_day = calendar.monthrange(last_month_year, last_month)[1]
# 上月结束时间
last_month_end_time = datetime(last_month_year, last_month, last_month_last_day, 23, 59, 59, 999999)
# 计算今日查询数量
today_count = await QueryCode.filter(create_time__gte=today_start_time, create_time__lte=today_end_time,
session__operator__id=user_id).count()
# 计算当月查询数量
this_month_count = await QueryCode.filter(create_time__gte=this_month_start_time,
create_time__lte=this_month_end_time,
session__operator__id=user_id).count()
# 计算上月查询数量
last_month_count = await QueryCode.filter(create_time__gte=last_month_start_time,
create_time__lte=last_month_end_time,
session__operator__id=user_id).count()
async def get_last_14_days_count(status: int = 1):
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# 保存结果的列表
result = []
# 遍历最近7天包括今天
for i in range(14):
day = today - timedelta(days=i)
# 当天的开始时间和结束时间
day_start = day # 00:00:00
day_end = (day + timedelta(days=1)) # 23:59:59
# 统计当天查询数量
count = await QueryCode.filter(
create_time__gte=day_start,
create_time__lte=day_end,
session__operator__id=user_id,
status=status
).count()
# 添加到结果列表(格式化日期为 YYYY-MM-DD
result.append({"date": day.strftime("%Y-%m-%d"), "count": count})
# 结果按日期升序排列(可选,确保从过去到现在排序)
result.sort(key=lambda x: x["date"])
return result
# 过去14天内的查询数量
before_14day_count_success = await get_last_14_days_count(1)
before_14day_count_fail = await get_last_14_days_count(0)
return Response.success(data={
"today_count": today_count,
"this_month_count": this_month_count,
"last_month_count": last_month_count,
"before_14day_count_success": before_14day_count_success,
"before_14day_count_fail": before_14day_count_fail,
})