117 lines
5.0 KiB
Python

# _*_ coding : UTF-8 _*_
# @Time : 2025/02/04 15:18
# @UpdateTime : 2025/02/04 15:18
# @Author : sonder
# @File : cache.py
# @Software : PyCharm
# @Comment : 本程序
from fastapi import APIRouter, Depends, Path, Request
from fastapi.responses import JSONResponse
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
from schemas.cache import CacheMonitor, GetCacheInfoResponse, CacheInfo, GetCacheKeysListResponse, \
GetCacheMonitorResponse
from schemas.common import BaseResponse
from utils.response import Response
cacheAPI = APIRouter(
prefix="/cache",
dependencies=[Depends(LoginController.get_current_user)],
)
@cacheAPI.get("/monitor", response_class=JSONResponse, response_model=GetCacheMonitorResponse,
summary="获取缓存监控信息")
@Log(title="获取缓存监控信息", business_type=BusinessType.SELECT)
async def get_cache_info(request: Request):
info = await request.app.state.redis.info()
db_size = await request.app.state.redis.dbsize()
command_stats_dict = await request.app.state.redis.info('commandstats')
command_stats = [
dict(name=key.split('_')[1], value=str(value.get('calls'))) for key, value in command_stats_dict.items()
]
cache_info = CacheMonitor(
info=info,
dbSize=db_size,
commandStats=command_stats
)
return Response.success(data=cache_info)
@cacheAPI.get("/names", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存名称列表")
@Log(title="获取缓存名称列表", business_type=BusinessType.SELECT)
async def get_cache_names(request: Request):
name_list = []
for key_config in RedisKeyConfig:
name_list.append(
CacheInfo(
cacheKey='',
cacheName=key_config.key,
cacheValue='',
remark=key_config.remark,
)
)
return Response.success(data=name_list)
@cacheAPI.get("/keys/{cacheName}", response_class=JSONResponse, response_model=GetCacheKeysListResponse,
summary="获取缓存键名列表")
@Log(title="获取缓存键名列表", business_type=BusinessType.SELECT)
async def get_cache_keys(request: Request, cacheName: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{cacheName}*')
cache_key_list = [key.split(':', 1)[1] for key in cache_keys if key.startswith(f'{cacheName}:')]
return Response.success(data=cache_key_list)
@cacheAPI.get("/info/{cacheName}/{cacheKey}", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存信息")
@Log(title="获取缓存信息", business_type=BusinessType.SELECT)
async def get_cache_info(request: Request, cacheName: str = Path(description="缓存名称"),
cacheKey: str = Path(description="缓存键名")):
cache_value = await request.app.state.redis.get(f'{cacheName}:{cacheKey}')
cache_info = CacheInfo(
cacheKey=cacheKey,
cacheName=cacheName,
cacheValue=cache_value,
remark="",
)
return Response.success(data=cache_info)
@cacheAPI.delete("/cacheName/{name}", response_class=JSONResponse, response_model=BaseResponse,
summary="通过键名删除缓存")
@cacheAPI.post("/cacheName/{name}", response_class=JSONResponse, response_model=BaseResponse,
summary="通过键名删除缓存")
@Log(title="通过键名删除缓存", business_type=BusinessType.DELETE)
async def delete_cache(request: Request, name: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{name}*')
if cache_keys:
await request.app.state.redis.delete(*cache_keys)
return Response.success(msg=f"删除{name}缓存成功!")
@cacheAPI.delete("/cacheKey/{key}", response_class=JSONResponse, response_model=BaseResponse,
summary="通过键值删除缓存")
@cacheAPI.post("/cacheKey/{key}", response_class=JSONResponse, response_model=BaseResponse, summary="通过键值删除缓存")
@Log(title="通过键值删除缓存", business_type=BusinessType.DELETE)
async def delete_cache_key(request: Request, key: str = Path(description="缓存键名")):
cache_keys = await request.app.state.redis.keys(f'*{key}')
if cache_keys:
await request.app.state.redis.delete(*cache_keys)
return Response.success(msg=f"删除{key}缓存成功!")
@cacheAPI.delete("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@cacheAPI.post("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@Log(title="删除所有缓存", business_type=BusinessType.DELETE)
async def delete_all_cache(request: Request):
cache_keys = await request.app.state.redis.keys()
if cache_keys:
await request.app.state.redis.delete(*cache_keys)
return Response.success(msg="删除所有缓存成功!")