45 lines
1.4 KiB
Python
Raw Permalink Normal View History

2025-02-13 18:04:42 +08:00
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/13 17:28
# @UpdateTime : 2025/02/13 17:28
# @Author : sonder
# @File : get_ElasticSearch.py
# @Software : PyCharm
# @Comment : 本程序
from elasticsearch import AsyncElasticsearch
from config.env import ElasticSearchConfig
from utils.log import logger
class ElasticSearch:
"""
ElasticSearch工具类
"""
@classmethod
async def init_elasticsearch(cls):
"""
初始化elasticsearch
"""
try:
# 创建异步Elasticsearch客户端
es = AsyncElasticsearch(
hosts=[{
"scheme": "http", # 传递scheme
"host": ElasticSearchConfig.ES_HOST, # 传递主机名
"port": ElasticSearchConfig.ES_PORT, # 传递端口
}],
http_auth=(ElasticSearchConfig.ES_USER, ElasticSearchConfig.ES_PASSWORD), # 传递http_auth
)
# 检查连接是否成功
if await es.ping():
logger.success(f"ElasticSearch初始化成功: {ElasticSearchConfig.ES_HOST}:{ElasticSearchConfig.ES_PORT}")
else:
logger.warning("ElasticSearch初始化失败无法ping通服务器")
es = None
return es
except Exception as e:
logger.error(f"ElasticSearch初始化失败: {e}")
return None