# _*_ coding : UTF-8 _*_ # @Time : 2025/02/04 16:05 # @UpdateTime : 2025/02/04 16:05 # @Author : sonder # @File : i18n.py # @Software : PyCharm # @Comment : 本程序 from datetime import timedelta from typing import Optional from fastapi import APIRouter, Depends, Path, Request, Query from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse from annotation.auth import Auth from annotation.log import Log from config.constant import BusinessType, RedisKeyConfig from controller.login import LoginController from models import I18n, Locale from schemas.common import BaseResponse, DeleteListParams from schemas.i18n import AddLocaleParams, GetLocaleInfoResponse, AddI18nParams, GetI18nInfoResponse, \ GetI18nInfoListResponse, GetI18nListResponse from utils.response import Response i18nAPI = APIRouter( prefix="/i18n", dependencies=[Depends(LoginController.get_current_user)] ) @i18nAPI.post("/addLocale", response_class=JSONResponse, response_model=BaseResponse, summary="添加国际化类型") @Log(title="添加国际化类型", business_type=BusinessType.INSERT) @Auth(["locale:btn:add"]) async def add_locale(request: Request, params: AddLocaleParams): if await Locale.get_or_none(code=params.code, del_flag=1): return Response.error(msg="该语言代码已存在!") locale = await Locale.create( code=params.code, name=params.name ) if locale: return Response.success(msg="添加成功!") else: return Response.error(msg="添加失败!") @i18nAPI.delete("/deleteLocale/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除国际化类型") @i18nAPI.post("/deleteLocale/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除国际化类型") @Log(title="删除国际化类型", business_type=BusinessType.DELETE) @Auth(["locale:btn:delete"]) async def delete_locale(request: Request, id: str = Path(description="国际化类型ID")): if locale := await Locale.get_or_none(id=id, del_flag=1): # 移除语言 await I18n.filter(locale_id=locale.id, del_flag=1).update(del_flag=0) locale.del_flag = 0 await locale.save() return Response.success(msg="删除成功!") else: return Response.error(msg="该国际化类型不存在!") @i18nAPI.delete("/deleteLocaleList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除国际化类型") @i18nAPI.post("/deleteLocaleList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除国际化类型") @Log(title="批量删除国际化类型", business_type=BusinessType.DELETE) @Auth(["locale:btn:delete"]) async def delete_locale_list(request: Request, params: DeleteListParams): for id in set(params.ids): if locale := await Locale.get_or_none(id=id, del_flag=1): # 移除语言 await I18n.filter(locale_id=locale.id, del_flag=1).update(del_flag=0) locale.del_flag = 0 await locale.save() return Response.success(msg="删除成功!") @i18nAPI.put("/updateLocale/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改国际化类型") @i18nAPI.post("/updateLocale/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改国际化类型") @Log(title="修改国际化类型", business_type=BusinessType.UPDATE) @Auth(["locale:btn:update"]) async def update_locale(request: Request, params: AddLocaleParams, id: str = Path(description="国际化类型ID")): if locale := await Locale.get_or_none(id=id, del_flag=1): if await Locale.get_or_none(code=params.code, name=params.name, id=id, del_flag=1): return Response.error(msg="该国际化类型已存在!") locale.code = params.code locale.name = params.name await locale.save() return Response.success(msg="修改成功!") else: return Response.error(msg="该国际化类型不存在!") @i18nAPI.get("/locale/info/{id}", response_class=JSONResponse, response_model=GetLocaleInfoResponse, summary="获取国际化类型信息") @Log(title="获取国际化类型信息", business_type=BusinessType.SELECT) @Auth(["locale:btn:info"]) async def get_locale_info(request: Request, id: str = Path(description="国际化类型ID")): if locale := await Locale.get_or_none(id=id, del_flag=1): locale = { "id": locale.id, "code": locale.code, "name": locale.name, "create_time": locale.create_time, "update_time": locale.update_time, "create_by": locale.create_by, "update_by": locale.update_by, } return Response.success(data=locale) else: return Response.error(msg="该国际化类型不存在!") @i18nAPI.get("/locale/list", response_class=JSONResponse, response_model=BaseResponse, summary="获取国际化类型列表") @Log(title="获取国际化类型列表", business_type=BusinessType.SELECT) @Auth(["locale:btn:list"]) async def get_locale_list(request: Request, page: int = Query(default=1, description="当前页码"), pageSize: int = Query(default=50, description="每页条数"), name: Optional[str] = Query(default=None, description="国际化类型名称"), code: Optional[str] = Query(default=None, description="国际化类型代码"), ): filterArgs = { f'{k}__contains': v for k, v in { 'name': name, 'code': code }.items() if v } total = await Locale.filter(**filterArgs, del_flag=1).count() data = await Locale.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit( pageSize).distinct().values( id="id", code="code", name="name", create_time="create_time", update_time="update_time", create_by="create_by", update_by="update_by" ) return Response.success(data={ "total": total, "result": data, "page": page }) @i18nAPI.post("/addI18n", response_class=JSONResponse, response_model=BaseResponse, summary="添加国际化内容") @Log(title="添加国际化内容", business_type=BusinessType.INSERT) @Auth(["i18n:btn:add"]) async def add_i18n(request: Request, params: AddI18nParams): if await I18n.get_or_none(key=params.key, locale_id=params.locale_id, del_flag=1): return Response.error(msg="该国际化内容已存在!") locale = await Locale.get_or_none(id=params.locale_id, del_flag=1) i18n = await I18n.create( key=params.key, translation=params.translation, locale=locale ) if i18n: return Response.success(msg="添加成功!") else: return Response.error(msg="添加失败!") @i18nAPI.delete("/deleteI18n/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除国际化内容") @i18nAPI.post("/deleteI18n/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除国际化内容") @Log(title="删除国际化内容", business_type=BusinessType.DELETE) @Auth(["i18n:btn:delete"]) async def delete_i18n(request: Request, id: str = Path(description="国际化内容ID")): if i18n := await I18n.get_or_none(id=id, del_flag=1): i18n.del_flag = 0 await i18n.save() return Response.success(msg="删除成功!") else: return Response.error(msg="该国际化内容不存在!") @i18nAPI.delete("/deleteI18nList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除国际化内容") @i18nAPI.post("/deleteI18nList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除国际化内容") @Log(title="批量删除国际化内容", business_type=BusinessType.DELETE) @Auth(["i18n:btn:delete"]) async def delete_i18n_list(request: Request, params: DeleteListParams): for id in set(params.ids): if i18n := await I18n.get_or_none(id=id, del_flag=1): i18n.del_flag = 0 await i18n.save() return Response.success(msg="删除成功!") @i18nAPI.put("/updateI18n/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改国际化内容") @i18nAPI.post("/updateI18n/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改国际化内容") @Log(title="修改国际化内容", business_type=BusinessType.UPDATE) @Auth(["i18n:btn:update"]) async def update_i18n(request: Request, params: AddI18nParams, id: str = Path(description="国际化内容ID")): if i18n := await I18n.get_or_none(id=id, del_flag=1): locale = await Locale.get_or_none(id=params.locale_id, del_flag=1) i18n.key = params.key i18n.translation = params.translation i18n.locale = locale await i18n.save() return Response.success(msg="修改成功!") else: return Response.error(msg="该国际化内容不存在!") @i18nAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetI18nInfoResponse, summary="获取国际化内容信息") @Log(title="获取国际化内容信息", business_type=BusinessType.SELECT) @Auth(["i18n:btn:info"]) async def get_i18n_info(request: Request, id: str = Path(description="国际化内容ID")): if i18n := await I18n.get_or_none(id=id, del_flag=1): i18n = { "id": i18n.id, "key": i18n.key, "translation": i18n.translation, "locale_id": i18n.locale.id, "locale_name": i18n.locale.name, "create_time": i18n.create_time, "update_time": i18n.update_time, "create_by": i18n.create_by, "update_by": i18n.update_by, } return Response.success(data=i18n) else: return Response.error(msg="该国际化内容不存在!") @i18nAPI.get("/list", response_class=JSONResponse, response_model=GetI18nListResponse, summary="获取国际化内容列表") @Log(title="获取国际化内容列表", business_type=BusinessType.SELECT) @Auth(["i18n:btn:list"]) async def get_i18n_list(request: Request, page: int = Query(default=1, description="当前页码"), pageSize: int = Query(default=50, description="每页条数"), key: Optional[str] = Query(default=None, description="国际化内容key"), locale_id: Optional[str] = Query(default=None, description="国际化内容语言ID"), translation: Optional[str] = Query(default=None, description="国际化内容翻译内容"), ): filterArgs = { f'{k}__contains': v for k, v in { 'key': key, 'locale_id': locale_id, 'translation': translation }.items() if v } total = await I18n.filter(**filterArgs, del_flag=1).count() data = await I18n.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values( id="id", key="key", translation="translation", locale_id="locale__id", locale_code="locale__code", locale_name="locale__name", create_time="create_time", update_time="update_time", create_by="create_by", update_by="update_by" ) return Response.success(data={ "total": total, "result": data, "page": page }) @i18nAPI.get("/infoList/{id}", response_class=JSONResponse, response_model=GetI18nInfoListResponse, summary="获取国际化列表") @Log(title="获取国际化列表", business_type=BusinessType.SELECT) @Auth(["i18n:btn:infoList"]) async def get_i18n_info_list(request: Request, id: str = Path(description="国际化内容语言ID")): if locale := await Locale.get_or_none(id=id, del_flag=1): data = await I18n.filter(locale_id=locale.id, del_flag=1).order_by("key").values( id="id", key="key", translation="translation", locale_id="locale__id", locale_name="locale__name", create_time="create_time", update_time="update_time", create_by="create_by", update_by="update_by" ) result = {} for i18n in data: result[f"{i18n['key']}"] = i18n["translation"] return Response.success(data={ "data": result, "locale": locale.code, "name": locale.name, }) return Response.error(msg="该国际化内容语言不存在!")