242 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding : UTF-8 _*_
# @Time : 2025/01/25 17:32
# @UpdateTime : 2025/01/25 17:32
# @Author : sonder
# @File : log.py
# @Software : PyCharm
# @Comment : 本程序日志装饰器定义
import hashlib
import ipaddress
import json
import time
import urllib
from functools import wraps
from typing import Optional, Literal
from async_lru import alru_cache
from fastapi import Request
from fastapi.responses import ORJSONResponse, UJSONResponse, JSONResponse
from httpx import AsyncClient
from user_agents import parse
from config.constant import BusinessType
from config.env import AppConfig, MapConfig
from controller.login import LoginController
from exceptions.exception import LoginException, ServiceWarning, ServiceException, PermissionException
from models import LoginLog, OperationLog, User
from utils.log import logger
from utils.response import Response
class Log:
"""
日志装饰器
"""
def __init__(
self,
title: str,
business_type: BusinessType,
log_type: Optional[Literal['login', 'operation']] = 'operation',
):
"""
日志装饰器
:param title: 当前日志装饰器装饰的模块标题
:param business_type: 业务类型
:param log_type: 日志类型login表示登录日志operation表示为操作日志
:return:
"""
self.title = title
self.business_type = business_type.value
self.log_type = log_type
def __call__(self, func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs): # 直接接收 request 参数
start_time = time.time()
# 获取上下文信息
token = request.headers.get('Authorization') # 直接使用 request 对象
# 获取请求方法、URL、IP、User-Agent 等信息
request_method = request.method
# 获取请求路径
request_path = request.url.path
# 获取请求IP
host = request.headers.get('X-Forwarded-For') or request.client.host
# 获取请求设备类型
user_agent = request.headers.get('User-Agent', '')
user_agent_info = parse(user_agent)
# 获取请求设备浏览器类型
browser = f'{user_agent_info.browser.family}'
# 获取请求设备操作系统类型
system_os = f'{user_agent_info.os.family}'
if user_agent_info.browser.version != ():
browser += f' {user_agent_info.browser.version[0]}'
if user_agent_info.os.version != ():
system_os += f' {user_agent_info.os.version[0]}'
# 解析 IP 地址的地理位置
location = '内网IP'
if AppConfig.app_ip_location_query: # 假设有一个配置项控制是否查询 IP 地理位置
location = await get_ip_location(host)
# 获取请求参数
content_type = request.headers.get('Content-Type')
if content_type and 'application/x-www-form-urlencoded' in content_type:
payload = await request.form()
request_params = '\n'.join([f'{key}: {value}' for key, value in payload.items()])
elif content_type and 'multipart/form-data' in content_type:
request_params = {}
else:
payload = await request.body()
path_params = request.path_params
request_params = {}
if payload:
request_params.update(json.loads(str(payload, 'utf-8')))
if path_params:
request_params.update(path_params)
request_params = json.dumps(request_params, ensure_ascii=False)
try:
# 调用原始函数
result = await func(request, *args, **kwargs) # 将 request 传递给原始函数
status = 1 # 操作成功
except (LoginException, ServiceWarning) as e:
logger.warning(e.message)
result = Response.failure(data=e.data, msg=e.message)
status = 0 # 操作失败
except ServiceException as e:
logger.error(e.message)
result = Response.error(data=e.data, msg=e.message)
status = 0 # 操作失败
except PermissionException as e:
logger.error(e.message)
result = Response.forbidden(data=e.data, msg=e.message)
status = 0 # 操作失败
except Exception as e:
logger.exception(e)
result = Response.error(msg=str(e))
status = 0 # 操作失败
# 获取操作时间
cost_time = float(time.time() - start_time) * 100
# 判断请求是否来自api文档
request_from_swagger = (
request.headers.get('referer').endswith('docs') if request.headers.get('referer') else False
)
request_from_redoc = (
request.headers.get('referer').endswith('redoc') if request.headers.get('referer') else False
)
# 根据响应结果的类型使用不同的方法获取响应结果参数
if (
isinstance(result, JSONResponse)
or isinstance(result, ORJSONResponse)
or isinstance(result, UJSONResponse)
):
result_dict = json.loads(str(result.body, 'utf-8'))
else:
if request_from_swagger or request_from_redoc:
result_dict = {}
else:
if result.status_code == 200:
result_dict = {'code': result.status_code, 'message': '获取成功'}
else:
result_dict = {'code': result.status_code, 'message': '获取失败'}
json_result = json.dumps(result_dict, ensure_ascii=False)
# 根据日志类型向对应的日志表插入数据
if self.log_type == 'login':
# 登录请求来自于api文档时不记录登录日志其余情况则记录
# if request_from_swagger or request_from_redoc:
# pass
# else:
session_id = request.app.state.session_id
status = 1 if request.app.state.login_status else 0
current_user = await User.get_or_none(username=payload.get("username"), del_flag=1)
await LoginLog.create(
user_id=current_user.id,
login_ip=host,
login_location=location,
browser=browser,
os=system_os,
status=status,
session_id=session_id
)
else:
if "image" in request.headers.get("Accept", ""):
pass
else:
current_user = await LoginController.get_current_user(request, token)
await OperationLog.create(
operation_name=self.title,
operation_type=self.business_type,
request_method=request_method,
request_path=request_path,
operator_id=current_user.get("id"),
department_id=current_user.get("department_id"),
department_name=current_user.get("department_name"),
host=host,
location=location,
user_agent=user_agent,
browser=browser,
os=system_os,
request_params=request_params,
response_result=json_result,
status=status,
cost_time=cost_time,
)
# 返回原始函数的结果
return result
return wrapper
@alru_cache()
async def get_ip_location(ip: str) -> str:
"""
根据IP地址获取地理位置
"""
try:
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private:
return "内网IP"
else:
# 服务地址
host = "https://api.map.baidu.com"
headers = {
'User-Agent': 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107.0.1418.42'}
# 接口地址
uri = "/location/ip"
params = {
"ip": ip,
"coor": "bd09ll",
"ak": MapConfig.ak,
}
paramsArr = []
for key in params:
paramsArr.append(key + "=" + params[key])
queryStr = uri + "?" + "&".join(paramsArr)
# 对queryStr进行转码safe内的保留字符不转换
encodedStr = urllib.request.quote(queryStr, safe="/:=&?#+!$,;'@()*[]")
# 在最后直接追加上您的SK
rawStr = encodedStr + MapConfig.sk
# 计算sn
sn = hashlib.md5(urllib.parse.quote_plus(rawStr).encode("utf8")).hexdigest()
# 将sn参数添加到请求中
queryStr = queryStr + "&sn=" + sn
url = host + queryStr
async with AsyncClient(headers=headers, timeout=60) as client:
response = await client.get(url)
if response.status_code == 200:
result = response.json()
if result.get("status") == 0:
return result.get("content", {}).get("address", "未知地点")
else:
return "未知地点"
else:
return "未知地点"
except ValueError:
# 如果IP地址格式无效
return "未知地点"