86 lines
2.9 KiB
Python
86 lines
2.9 KiB
Python
|
# _*_ coding : UTF-8 _*_
|
|||
|
# @Time : 2025/01/18 02:41
|
|||
|
# @UpdateTime : 2025/01/18 02:41
|
|||
|
# @Author : sonder
|
|||
|
# @File : get_redis.py
|
|||
|
# @Software : PyCharm
|
|||
|
# @Comment : 本程序
|
|||
|
|
|||
|
from redis import asyncio as aioredis
|
|||
|
from redis.exceptions import AuthenticationError, TimeoutError, RedisError
|
|||
|
|
|||
|
from config.constant import RedisKeyConfig
|
|||
|
from config.env import RedisConfig
|
|||
|
from models import Config
|
|||
|
from utils.log import logger
|
|||
|
|
|||
|
|
|||
|
class Redis:
|
|||
|
"""
|
|||
|
Redis工具类
|
|||
|
提供与 Redis 交互的常用方法,包括连接管理和缓存初始化。
|
|||
|
"""
|
|||
|
|
|||
|
@classmethod
|
|||
|
async def create_redis_pool(cls) -> aioredis.Redis:
|
|||
|
"""
|
|||
|
应用启动时初始化 Redis 连接池。
|
|||
|
|
|||
|
该方法通过 `aioredis` 创建一个 Redis 连接池,并验证连接是否成功。
|
|||
|
如果连接失败,会捕获异常并记录错误日志。
|
|||
|
|
|||
|
:return: Redis 连接对象
|
|||
|
"""
|
|||
|
logger.info('开始连接 Redis...')
|
|||
|
redis = await aioredis.from_url(
|
|||
|
url=f'redis://{RedisConfig.redis_host}',
|
|||
|
port=RedisConfig.redis_port,
|
|||
|
username=RedisConfig.redis_username,
|
|||
|
password=RedisConfig.redis_password,
|
|||
|
db=RedisConfig.redis_database,
|
|||
|
encoding='utf-8', # 默认编码为 UTF-8
|
|||
|
decode_responses=True, # 自动解码 Redis 响应
|
|||
|
)
|
|||
|
try:
|
|||
|
# 测试 Redis 连接
|
|||
|
connection = await redis.ping()
|
|||
|
if connection:
|
|||
|
logger.info('Redis 连接成功')
|
|||
|
else:
|
|||
|
logger.error('Redis 连接失败')
|
|||
|
except AuthenticationError as e:
|
|||
|
logger.error(f'Redis 用户名或密码错误,详细错误信息:{e}')
|
|||
|
except TimeoutError as e:
|
|||
|
logger.error(f'Redis 连接超时,详细错误信息:{e}')
|
|||
|
except RedisError as e:
|
|||
|
logger.error(f'Redis 连接错误,详细错误信息:{e}')
|
|||
|
return redis
|
|||
|
|
|||
|
@classmethod
|
|||
|
async def close_redis_pool(cls, app):
|
|||
|
"""
|
|||
|
应用关闭时关闭 Redis 连接池。
|
|||
|
|
|||
|
在应用生命周期结束时,清理 Redis 连接池以释放资源。
|
|||
|
|
|||
|
:param app: FastAPI 应用对象,用于访问全局 Redis 连接池。
|
|||
|
:return: None
|
|||
|
"""
|
|||
|
await app.state.redis.close()
|
|||
|
logger.info('关闭 Redis 连接成功')
|
|||
|
|
|||
|
@classmethod
|
|||
|
async def init_system_config(cls, app):
|
|||
|
"""
|
|||
|
初始化系统配置
|
|||
|
"""
|
|||
|
# 获取以sys_config:开头的键列表
|
|||
|
keys = await app.state.redis.keys(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:*')
|
|||
|
# 删除匹配的键
|
|||
|
if keys:
|
|||
|
await app.state.redis.delete(*keys)
|
|||
|
config = await Config.all().values()
|
|||
|
for item in config:
|
|||
|
await app.state.redis.set(f"{RedisKeyConfig.SYSTEM_CONFIG.key}:{item.get('key')}",
|
|||
|
item.get('value'), )
|