# _*_ coding : UTF-8 _*_ # @Time : 2025/01/25 17:32 # @UpdateTime : 2025/01/25 17:32 # @Author : sonder # @File : log.py # @Software : PyCharm # @Comment : 本程序日志装饰器定义 import hashlib import ipaddress import json import time import urllib from functools import wraps from typing import Optional, Literal from async_lru import alru_cache from fastapi import Request from fastapi.responses import ORJSONResponse, UJSONResponse, JSONResponse from httpx import AsyncClient from user_agents import parse from config.constant import BusinessType from config.env import AppConfig, MapConfig from controller.login import LoginController from exceptions.exception import LoginException, ServiceWarning, ServiceException, PermissionException from models import LoginLog, OperationLog, User from utils.log import logger from utils.response import Response class Log: """ 日志装饰器 """ def __init__( self, title: str, business_type: BusinessType, log_type: Optional[Literal['login', 'operation']] = 'operation', ): """ 日志装饰器 :param title: 当前日志装饰器装饰的模块标题 :param business_type: 业务类型 :param log_type: 日志类型(login表示登录日志,operation表示为操作日志) :return: """ self.title = title self.business_type = business_type.value self.log_type = log_type def __call__(self, func): @wraps(func) async def wrapper(request: Request, *args, **kwargs): # 直接接收 request 参数 start_time = time.time() # 获取上下文信息 token = request.headers.get('Authorization') # 直接使用 request 对象 # 获取请求方法、URL、IP、User-Agent 等信息 request_method = request.method # 获取请求路径 request_path = request.url.path # 获取请求IP host = request.headers.get('X-Forwarded-For') or request.client.host # 获取请求设备类型 user_agent = request.headers.get('User-Agent', '') user_agent_info = parse(user_agent) # 获取请求设备浏览器类型 browser = f'{user_agent_info.browser.family}' # 获取请求设备操作系统类型 system_os = f'{user_agent_info.os.family}' if user_agent_info.browser.version != (): browser += f' {user_agent_info.browser.version[0]}' if user_agent_info.os.version != (): system_os += f' {user_agent_info.os.version[0]}' # 解析 IP 地址的地理位置 location = '内网IP' if AppConfig.app_ip_location_query: # 假设有一个配置项控制是否查询 IP 地理位置 location = await get_ip_location(host) # 获取请求参数 content_type = request.headers.get('Content-Type') if content_type and 'application/x-www-form-urlencoded' in content_type: payload = await request.form() request_params = '\n'.join([f'{key}: {value}' for key, value in payload.items()]) elif content_type and 'multipart/form-data' in content_type: request_params = {} else: payload = await request.body() path_params = request.path_params request_params = {} if payload: request_params.update(json.loads(str(payload, 'utf-8'))) if path_params: request_params.update(path_params) request_params = json.dumps(request_params, ensure_ascii=False) try: # 调用原始函数 result = await func(request, *args, **kwargs) # 将 request 传递给原始函数 status = 1 # 操作成功 except (LoginException, ServiceWarning) as e: logger.warning(e.message) result = Response.failure(data=e.data, msg=e.message) status = 0 # 操作失败 except ServiceException as e: logger.error(e.message) result = Response.error(data=e.data, msg=e.message) status = 0 # 操作失败 except PermissionException as e: logger.error(e.message) result = Response.forbidden(data=e.data, msg=e.message) status = 0 # 操作失败 except Exception as e: logger.exception(e) result = Response.error(msg=str(e)) status = 0 # 操作失败 # 获取操作时间 cost_time = float(time.time() - start_time) * 100 # 判断请求是否来自api文档 request_from_swagger = ( request.headers.get('referer').endswith('docs') if request.headers.get('referer') else False ) request_from_redoc = ( request.headers.get('referer').endswith('redoc') if request.headers.get('referer') else False ) # 根据响应结果的类型使用不同的方法获取响应结果参数 if ( isinstance(result, JSONResponse) or isinstance(result, ORJSONResponse) or isinstance(result, UJSONResponse) ): result_dict = json.loads(str(result.body, 'utf-8')) else: if request_from_swagger or request_from_redoc: result_dict = {} else: if result.status_code == 200: result_dict = {'code': result.status_code, 'message': '获取成功'} else: result_dict = {'code': result.status_code, 'message': '获取失败'} json_result = json.dumps(result_dict, ensure_ascii=False) # 根据日志类型向对应的日志表插入数据 if self.log_type == 'login': # 登录请求来自于api文档时不记录登录日志,其余情况则记录 # if request_from_swagger or request_from_redoc: # pass # else: session_id = request.app.state.session_id status = 1 if request.app.state.login_status else 0 current_user = await User.get_or_none(username=payload.get("username")) await LoginLog.create( user_id=current_user.id, login_ip=host, login_location=location, browser=browser, os=system_os, status=status, session_id=session_id ) else: if "image" in request.headers.get("Accept", ""): pass else: current_user = await LoginController.get_current_user(request, token) await OperationLog.create( operation_name=self.title, operation_type=self.business_type, request_method=request_method, request_path=request_path, operator_id=current_user.get("id"), department_id=current_user.get("department_id"), department_name=current_user.get("department_name"), host=host, location=location, user_agent=user_agent, browser=browser, os=system_os, request_params=request_params, response_result=json_result, status=status, cost_time=cost_time, ) # 返回原始函数的结果 return result return wrapper @alru_cache() async def get_ip_location(ip: str) -> str: """ 根据IP地址获取地理位置 """ try: ip_obj = ipaddress.ip_address(ip) if ip_obj.is_private: return "内网IP" else: # 服务地址 host = "https://api.map.baidu.com" headers = { 'User-Agent': 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.42'} # 接口地址 uri = "/location/ip" params = { "ip": ip, "coor": "bd09ll", "ak": MapConfig.ak, } paramsArr = [] for key in params: paramsArr.append(key + "=" + params[key]) queryStr = uri + "?" + "&".join(paramsArr) # 对queryStr进行转码,safe内的保留字符不转换 encodedStr = urllib.request.quote(queryStr, safe="/:=&?#+!$,;'@()*[]") # 在最后直接追加上您的SK rawStr = encodedStr + MapConfig.sk # 计算sn sn = hashlib.md5(urllib.parse.quote_plus(rawStr).encode("utf8")).hexdigest() # 将sn参数添加到请求中 queryStr = queryStr + "&sn=" + sn url = host + queryStr async with AsyncClient(headers=headers, timeout=60) as client: response = await client.get(url) if response.status_code == 200: result = response.json() if result.get("status") == 0: return result.get("content", {}).get("address", "未知地点") else: return "未知地点" else: return "未知地点" except ValueError: # 如果IP地址格式无效 return "未知地点"