45 lines
1.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding : UTF-8 _*_
# @Time : 2025/02/13 17:28
# @UpdateTime : 2025/02/13 17:28
# @Author : sonder
# @File : get_ElasticSearch.py
# @Software : PyCharm
# @Comment : 本程序
from elasticsearch import AsyncElasticsearch
from config.env import ElasticSearchConfig
from utils.log import logger
class ElasticSearch:
"""
ElasticSearch工具类
"""
@classmethod
async def init_elasticsearch(cls):
"""
初始化elasticsearch
"""
try:
# 创建异步Elasticsearch客户端
es = AsyncElasticsearch(
hosts=[{
"scheme": "http", # 传递scheme
"host": ElasticSearchConfig.ES_HOST, # 传递主机名
"port": ElasticSearchConfig.ES_PORT, # 传递端口
}],
http_auth=(ElasticSearchConfig.ES_USER, ElasticSearchConfig.ES_PASSWORD), # 传递http_auth
)
# 检查连接是否成功
if await es.ping():
logger.success(f"ElasticSearch初始化成功: {ElasticSearchConfig.ES_HOST}:{ElasticSearchConfig.ES_PORT}")
else:
logger.warning("ElasticSearch初始化失败无法ping通服务器")
es = None
return es
except Exception as e:
logger.error(f"ElasticSearch初始化失败: {e}")
return None