# _*_ coding : UTF-8 _*_ # @Time : 2025/02/13 17:28 # @UpdateTime : 2025/02/13 17:28 # @Author : sonder # @File : get_ElasticSearch.py # @Software : PyCharm # @Comment : 本程序 from elasticsearch import AsyncElasticsearch from config.env import ElasticSearchConfig from utils.log import logger class ElasticSearch: """ ElasticSearch工具类 """ @classmethod async def init_elasticsearch(cls): """ 初始化elasticsearch """ try: # 创建异步Elasticsearch客户端 es = AsyncElasticsearch( hosts=[{ "scheme": "http", # 传递scheme "host": ElasticSearchConfig.ES_HOST, # 传递主机名 "port": ElasticSearchConfig.ES_PORT, # 传递端口 }], http_auth=(ElasticSearchConfig.ES_USER, ElasticSearchConfig.ES_PASSWORD), # 传递http_auth ) # 检查连接是否成功 if await es.ping(): logger.success(f"ElasticSearch初始化成功: {ElasticSearchConfig.ES_HOST}:{ElasticSearchConfig.ES_PORT}") else: logger.warning("ElasticSearch初始化失败:无法ping通服务器") es = None return es except Exception as e: logger.error(f"ElasticSearch初始化失败: {e}") return None