# _*_ coding : UTF-8 _*_ # @Time : 2025/01/27 21:40 # @UpdateTime : 2025/01/27 21:40 # @Author : sonder # @File : log.py # @Software : PyCharm # @Comment : 本程序 from fastapi import APIRouter, Depends, Path, Query, Request from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse from annotation.auth import Auth from annotation.log import Log from config.constant import BusinessType, RedisKeyConfig from controller.login import LoginController from models import LoginLog, OperationLog from schemas.common import BaseResponse, DeleteListParams from schemas.log import GetLoginLogResponse, GetOperationLogResponse from utils.response import Response logAPI = APIRouter( prefix="/log", dependencies=[Depends(LoginController.get_current_user)] ) @logAPI.get("/login", response_class=JSONResponse, response_model=GetLoginLogResponse, summary="用户获取登录日志") async def get_login_log(request: Request, page: int = Query(default=1, description="页码"), pageSize: int = Query(default=10, description="每页数量"), current_user: dict = Depends(LoginController.get_current_user), ): online_user_list = await LoginController.get_online_user(request) online_user_list = list( filter(lambda x: x["user_id"] == current_user.get("id"), jsonable_encoder(online_user_list, ))) user_id = current_user.get("id") result = await LoginLog.filter(user_id=user_id, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values( id="id", user_id="user__id", username="user__username", user_nickname="user__nickname", department_id="user__department__id", department_name="user__department__name", login_ip="login_ip", login_location="login_location", browser="browser", os="os", status="status", login_time="login_time", session_id="session_id", create_time="create_time", update_time="update_time" ) for log in result: log["online"] = False for item in online_user_list: if item["session_id"] == log["session_id"]: log["online"] = True return Response.success(data={ "total": await LoginLog.filter(user_id=user_id).count(), "result": result, "page": page, }) @logAPI.delete("/logout/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="用户强退") @logAPI.post("/logout/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="用户强退") @Log(title="用户强退", business_type=BusinessType.DELETE) # @Auth(permission_list=["user:btn:logout"]) async def logout_user(request: Request, id: str = Path(description="会话ID"), current_user: dict = Depends(LoginController.get_current_user)): if await LoginLog.get_or_none(user_id=current_user.get("id"), session_id=id): await request.app.state.redis.delete(f"{RedisKeyConfig.ACCESS_TOKEN.key}:{id}") return Response.success(msg="强退成功!") return Response.failure(msg="会话不存在!") @logAPI.delete("/delete/login/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除登录日志") @logAPI.post("/delete/login/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除登录日志") @Log(title="用户删除登录日志", business_type=BusinessType.DELETE) @Auth(permission_list=["login:btn:delete"]) async def delete_login_log(id: str = Path(..., description="登录日志ID"), current_user: dict = Depends(LoginController.get_current_user)): if log := await LoginLog.get_or_none(id=id): if log.user == current_user.get("id"): log.del_flag = 0 await log.save() return Response.success(msg="删除成功") else: return Response.failure(msg="无权限删除") else: return Response.failure(msg="删除失败,登录日志不存在!") @logAPI.delete("/deleteList/login", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除登录日志") @logAPI.post("/deleteList/login", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除登录日志") @Log(title="用户批量删除登录日志", business_type=BusinessType.DELETE) @Auth(permission_list=["login:btn:delete"]) async def delete_login_log(params: DeleteListParams, current_user: dict = Depends(LoginController.get_current_user)): for id in set(params.ids): if log := await LoginLog.get_or_none(id=id): if log.user == current_user.get("id"): log.del_flag = 0 await log.save() return Response.success(msg="删除成功") @logAPI.get("/operation", response_class=JSONResponse, response_model=GetOperationLogResponse, summary="用户获取操作日志") async def get_operation_log(request: Request, page: int = Query(default=1, description="页码"), pageSize: int = Query(default=10, description="每页数量"), current_user: dict = Depends(LoginController.get_current_user), ): user_id = current_user.get("id") result = await OperationLog.filter(operator_id=user_id, del_flag=1).offset((page - 1) * pageSize).limit( pageSize).values( id="id", operation_name="operation_name", operation_type="operation_type", request_path="request_path", request_method="request_method", request_params="request_params", response_result="response_result", host="host", location="location", browser="browser", os="os", user_agent="user_agent", operator_id="operator__id", operator_name="operator__username", operator_nickname="operator__nickname", department_id="department__id", department_name="department__name", status="status", operation_time="operation_time", cost_time="cost_time" ) return Response.success(data={ "total": await OperationLog.filter(operator_id=user_id).count(), "result": result, "page": page, }) @logAPI.delete("/delete/operation/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除操作日志") @logAPI.post("/delete/operation/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除操作日志") @Log(title="用户删除操作日志", business_type=BusinessType.DELETE) @Auth(permission_list=["operation:btn:delete"]) async def delete_operation_log(id: str = Path(..., description="操作日志id"), current_user: dict = Depends(LoginController.get_current_user)): if log := await OperationLog.get_or_none(id=id): if log.operator == current_user.get("id"): log.del_flag = 0 await log.save() return Response.success(msg="删除成功") else: return Response.failure(msg="无权限删除") else: return Response.failure(msg="删除失败,操作日志不存在!") @logAPI.delete("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除操作日志") @logAPI.post("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse, summary="用户删除操作日志") @Log(title="用户批量删除操作日志", business_type=BusinessType.DELETE) @Auth(permission_list=["operation:btn:delete"]) async def delete_operation_log(params: DeleteListParams, current_user: dict = Depends(LoginController.get_current_user)): for id in set(params.ids): if log := await OperationLog.get_or_none(id=id): if log.operator == current_user.get("id"): log.del_flag = 0 await log.save() return Response.success(msg="删除成功")