feat: 给编码管理添加按钮级权限管理

This commit is contained in:
皓月归尘 2025-02-23 23:50:42 +08:00
parent 550ebabc16
commit afe1ae8353
2 changed files with 119 additions and 124 deletions

View File

@ -16,28 +16,29 @@ from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, FileResponse from fastapi.responses import JSONResponse, FileResponse
from annotation.auth import Auth
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType from config.constant import BusinessType
from config.env import ElasticSearchConfig from config.env import ElasticSearchConfig
from controller.login import LoginController from controller.login import LoginController
from exceptions.exception import ServiceException, PermissionException from exceptions.exception import ServiceException, PermissionException
from models import File, Code, QueryCode, QueryCodeLog from models import File, Code, QueryCode, QueryCodeLog
from schemas.code import GetCodeInfoResponse, GetCodeListResponse, GetQueryCodeParams, \ from schemas.code import GetCodeInfoResponse, GetCodeListResponse, GetQueryCodeParams, QueryCodeResponse, AddCodeParams, \
DeleteCodeListParams, QueryCodeResponse, AddCodeParams, GetQueryCodeLogResponse, GetQueryCodeLogDetailResponse, \ GetQueryCodeLogResponse, GetQueryCodeLogDetailResponse, \
GetCodeLogAllResponse GetCodeLogAllResponse
from schemas.common import BaseResponse from schemas.common import BaseResponse, DeleteListParams
from utils.log import logger from utils.log import logger
from utils.response import Response from utils.response import Response
codeAPI = APIRouter( codeAPI = APIRouter(
prefix="/code", prefix="/code"
dependencies=[Depends(LoginController.get_current_user)]
) )
@codeAPI.get("/template", summary="获取上传编码模板") @codeAPI.get("/template", summary="获取上传编码模板")
@Log(title="获取上传编码模板", business_type=BusinessType.SELECT) @Log(title="获取上传编码模板", business_type=BusinessType.SELECT)
async def get_upload_template(request: Request): @Auth(permission_list=["code:btn:uploadTemplate"])
async def get_upload_template(request: Request, current_user=Depends(LoginController.get_current_user)):
template_path = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'templates', '上传模版.xlsx') template_path = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'templates', '上传模版.xlsx')
if not os.path.exists(template_path): if not os.path.exists(template_path):
raise ServiceException(message="文件不存在!") raise ServiceException(message="文件不存在!")
@ -50,7 +51,8 @@ async def get_upload_template(request: Request):
@codeAPI.get("/queryTemplate", summary="获取查询编码模板") @codeAPI.get("/queryTemplate", summary="获取查询编码模板")
@Log(title="获取查询编码模板", business_type=BusinessType.SELECT) @Log(title="获取查询编码模板", business_type=BusinessType.SELECT)
async def get_query_template(request: Request): @Auth(permission_list=["code:btn:queryTemplate"])
async def get_query_template(request: Request, current_user=Depends(LoginController.get_current_user)):
template_path = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'templates', '查询模版.xlsx') template_path = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'templates', '查询模版.xlsx')
if not os.path.exists(template_path): if not os.path.exists(template_path):
raise ServiceException(message="文件不存在!") raise ServiceException(message="文件不存在!")
@ -63,11 +65,12 @@ async def get_query_template(request: Request):
@codeAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="添加编码") @codeAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="添加编码")
@Log(title="添加编码", business_type=BusinessType.INSERT) @Log(title="添加编码", business_type=BusinessType.INSERT)
async def add_code(request: Request, params: AddCodeParams): @Auth(permission_list=["code:btn:add"])
async def add_code(request: Request, params: AddCodeParams, current_user=Depends(LoginController.get_current_user)):
params.code = params.code.replace(".", "").replace("/", "").replace("_", "").replace("-", "").replace("?", params.code = params.code.replace(".", "").replace("/", "").replace("_", "").replace("-", "").replace("?",
"").replace( "").replace(
":", "").replace("", "").replace("", "").strip() ":", "").replace("", "").replace("", "").strip()
if await Code.get_or_none(code=params.code): if await Code.get_or_none(code=params.code, del_flag=1):
return Response.failure(msg="编码已存在") return Response.failure(msg="编码已存在")
else: else:
if await request.app.state.es.indices.exists(index=ElasticSearchConfig.ES_INDEX): if await request.app.state.es.indices.exists(index=ElasticSearchConfig.ES_INDEX):
@ -97,17 +100,17 @@ async def add_code(request: Request, params: AddCodeParams):
@codeAPI.get("/addCode/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="导入编码") @codeAPI.get("/addCode/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="导入编码")
@Log(title="导入编码", business_type=BusinessType.INSERT) @Log(title="导入编码", business_type=BusinessType.INSERT)
@Auth(permission_list=["code:btn:import"])
async def add_code_by_file(request: Request, id: str = Path(description="文件ID"), async def add_code_by_file(request: Request, id: str = Path(description="文件ID"),
current_user=Depends(LoginController.get_current_user)): current_user=Depends(LoginController.get_current_user)):
user_id = current_user.get("id") user_id = current_user.get("id")
if file := await File.get_or_none(id=id): if file := await File.get_or_none(id=id, del_flag=1):
uploader_id = await file.first().values(id="uploader__id") uploader_id = await file.first().values(id="uploader__id")
if str(uploader_id["id"]) == user_id: if str(uploader_id["id"]) == user_id:
try: try:
df = pd.read_excel(file.absolute_path, dtype={"code": str}) df = pd.read_excel(file.absolute_path, dtype={"code": str})
df["code"] = df["code"].astype(str).str.zfill(8) df["code"] = df["code"].astype(str).str.zfill(8)
for index, row in df.iterrows(): for index, row in df.iterrows():
print(row['code'],type(row['code']))
row["code"] = row["code"].replace(".", "").replace("/", "").replace("_", "").replace("-", row["code"] = row["code"].replace(".", "").replace("/", "").replace("_", "").replace("-",
"").replace( "").replace(
"?", "?",
@ -146,9 +149,12 @@ async def add_code_by_file(request: Request, id: str = Path(description="文件I
@codeAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码") @codeAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码")
@codeAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码") @codeAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码")
@Log(title="删除编码", business_type=BusinessType.DELETE) @Log(title="删除编码", business_type=BusinessType.DELETE)
async def delete_code_by_id(request: Request, id: str = Path(description="编码ID"), ): @Auth(permission_list=["code:btn:delete"])
if code := await Code.get_or_none(id=id): async def delete_code_by_id(request: Request, id: str = Path(description="编码ID"),
await code.delete() current_user=Depends(LoginController.get_current_user)):
if code := await Code.get_or_none(id=id, del_flag=1):
code.del_flag = 0
await code.save()
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code) await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code)
return Response.success(msg="删除成功") return Response.success(msg="删除成功")
else: else:
@ -158,10 +164,13 @@ async def delete_code_by_id(request: Request, id: str = Path(description="编码
@codeAPI.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码") @codeAPI.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码")
@codeAPI.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码") @codeAPI.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码")
@Log(title="批量删除编码", business_type=BusinessType.DELETE) @Log(title="批量删除编码", business_type=BusinessType.DELETE)
async def delete_code_by_ids(request: Request, params: DeleteCodeListParams): @Auth(permission_list=["code:btn:delete"])
async def delete_code_by_ids(request: Request, params: DeleteListParams,
current_user=Depends(LoginController.get_current_user)):
for id in set(params.ids): for id in set(params.ids):
if code := await Code.get_or_none(id=id): if code := await Code.get_or_none(id=id, del_flag=1):
await code.delete() code.del_flag = 0
await code.save()
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code) await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code)
return Response.success(msg="删除成功") return Response.success(msg="删除成功")
@ -169,10 +178,12 @@ async def delete_code_by_ids(request: Request, params: DeleteCodeListParams):
@codeAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新编码") @codeAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新编码")
@codeAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新编码") @codeAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新编码")
@Log(title="更新编码", business_type=BusinessType.UPDATE) @Log(title="更新编码", business_type=BusinessType.UPDATE)
@Auth(permission_list=["code:btn:update"])
async def update_code(request: Request, async def update_code(request: Request,
params: AddCodeParams, params: AddCodeParams,
id: str = Path(description="编码ID")): id: str = Path(description="编码ID"),
if code := await Code.get_or_none(id=id): current_user=Depends(LoginController.get_current_user)):
if code := await Code.get_or_none(id=id, del_flag=1):
code.code = params.code code.code = params.code
code.description = params.description code.description = params.description
await code.save() await code.save()
@ -184,8 +195,10 @@ async def update_code(request: Request,
@codeAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetCodeInfoResponse, summary="获取编码信息") @codeAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetCodeInfoResponse, summary="获取编码信息")
@Log(title="获取编码信息", business_type=BusinessType.SELECT) @Log(title="获取编码信息", business_type=BusinessType.SELECT)
async def get_code_info(request: Request, id: str = Path(description="编码ID")): @Auth(permission_list=["code:btn:info"])
if code := await Code.get_or_none(id=id).values( async def get_code_info(request: Request, id: str = Path(description="编码ID"),
current_user=Depends(LoginController.get_current_user)):
if code := await Code.get_or_none(id=id, del_flag=1).values(
id="id", id="id",
code="code", code="code",
description="description", description="description",
@ -200,19 +213,21 @@ async def get_code_info(request: Request, id: str = Path(description="编码ID")
@codeAPI.get("/list", response_class=JSONResponse, response_model=GetCodeListResponse, summary="获取编码列表") @codeAPI.get("/list", response_class=JSONResponse, response_model=GetCodeListResponse, summary="获取编码列表")
@Log(title="获取编码列表", business_type=BusinessType.SELECT) @Log(title="获取编码列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:list"])
async def get_code_list(request: Request, async def get_code_list(request: Request,
page: int = Query(default=1, description="页码"), page: int = Query(default=1, description="页码"),
pageSize: int = Query(default=10, description="每页数量"), pageSize: int = Query(default=10, description="每页数量"),
code: Optional[str] = Query(default=None, description="编码"), code: Optional[str] = Query(default=None, description="编码"),
description: Optional[str] = Query(default=None, description="商品描述")): description: Optional[str] = Query(default=None, description="商品描述"),
current_user=Depends(LoginController.get_current_user)):
filterArgs = { filterArgs = {
f'{k}__contains': v for k, v in { f'{k}__contains': v for k, v in {
'code': code, 'code': code,
'description': description 'description': description
}.items() if v }.items() if v
} }
total = await Code.filter(**filterArgs).count() total = await Code.filter(**filterArgs, del_flag=1).count()
data = await Code.filter(**filterArgs).offset((page - 1) * pageSize).limit(pageSize).values( data = await Code.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id", id="id",
code="code", code="code",
description="description", description="description",
@ -231,6 +246,7 @@ async def get_code_list(request: Request,
@codeAPI.post("/query", response_class=JSONResponse, response_model=QueryCodeResponse, summary="查询编码") @codeAPI.post("/query", response_class=JSONResponse, response_model=QueryCodeResponse, summary="查询编码")
@Log(title="查询编码", business_type=BusinessType.SELECT) @Log(title="查询编码", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:query"])
async def get_code_list(request: Request, async def get_code_list(request: Request,
params: GetQueryCodeParams, params: GetQueryCodeParams,
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
@ -278,7 +294,7 @@ async def get_code_list(request: Request,
max_score = data["hits"].get("max_score", 1) max_score = data["hits"].get("max_score", 1)
# 处理每一条匹配结果 # 处理每一条匹配结果
for hit in data["hits"]["hits"]: for hit in data["hits"]["hits"]:
code = await Code.get_or_none(code=hit["_source"]["code"]) code = await Code.get_or_none(code=hit["_source"]["code"], del_flag=1)
# 归一化匹配度,转换为百分比 # 归一化匹配度,转换为百分比
match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比 match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比
# 将匹配结果添加到列表中 # 将匹配结果添加到列表中
@ -329,13 +345,14 @@ async def get_code_list(request: Request,
@codeAPI.get("/query/{id}", response_class=JSONResponse, response_model=QueryCodeResponse, summary="查询编码") @codeAPI.get("/query/{id}", response_class=JSONResponse, response_model=QueryCodeResponse, summary="查询编码")
@Log(title="查询编码", business_type=BusinessType.SELECT) @Log(title="查询编码", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:importQuery"])
async def get_code_list(request: Request, async def get_code_list(request: Request,
id: str = Path(description="文件ID"), id: str = Path(description="文件ID"),
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
): ):
start_time = time.time() start_time = time.time()
user_id = current_user.get("id") user_id = current_user.get("id")
if file := await File.get_or_none(id=id): if file := await File.get_or_none(id=id, del_flag=1):
uploader_id = await file.first().values(id="uploader__id") uploader_id = await file.first().values(id="uploader__id")
if str(uploader_id["id"]) == user_id: if str(uploader_id["id"]) == user_id:
if log := await QueryCodeLog.create( if log := await QueryCodeLog.create(
@ -379,7 +396,7 @@ async def get_code_list(request: Request,
max_score = data["hits"].get("max_score", 1) max_score = data["hits"].get("max_score", 1)
# 处理每一条匹配结果 # 处理每一条匹配结果
for hit in data["hits"]["hits"]: for hit in data["hits"]["hits"]:
code = await Code.get_or_none(code=hit["_source"]["code"]) code = await Code.get_or_none(code=hit["_source"]["code"], del_flag=1)
# 归一化匹配度,转换为百分比 # 归一化匹配度,转换为百分比
match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比 match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比
# 将匹配结果添加到列表中 # 将匹配结果添加到列表中
@ -435,43 +452,36 @@ async def get_code_list(request: Request,
@codeAPI.get("/logList", response_class=JSONResponse, response_model=GetQueryCodeLogResponse, @codeAPI.get("/logList", response_class=JSONResponse, response_model=GetQueryCodeLogResponse,
summary="查询编码日志列表") summary="查询编码日志列表")
@Log(title="查询编码日志列表", business_type=BusinessType.SELECT) @Log(title="查询编码日志列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:logList"])
async def get_code_log_list(request: Request, async def get_code_log_list(request: Request,
page: int = Query(default=1, description="当前页码"), page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"), pageSize: int = Query(default=10, description="每页数量"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
startTime: Optional[str] = Query(default=None, description="开始时间"), startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"), endTime: Optional[str] = Query(default=None, description="结束时间"),
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
): ):
user_id = current_user.get("id") sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'operator__username': username,
'operator__nickname': nickname,
}.items() if v
}
if startTime and endTime: if startTime and endTime:
startTime = float(startTime) / 1000 startTime = float(startTime) / 1000
endTime = float(endTime) / 1000 endTime = float(endTime) / 1000
startTime = datetime.fromtimestamp(startTime) startTime = datetime.fromtimestamp(startTime)
endTime = datetime.fromtimestamp(endTime) endTime = datetime.fromtimestamp(endTime)
count = await QueryCodeLog.filter(operator_id=user_id, del_flag=1, operation_time__gte=startTime, filterArgs['operation_time__range'] = [startTime, endTime]
operation_time__lte=endTime).count() if not department_id:
data = await QueryCodeLog.filter(operator_id=user_id, del_flag=1, operation_time__gte=startTime, filterArgs['operator__department__id__in'] = sub_departments
operation_time__lte=endTime).order_by("-operation_time").offset(
(page - 1) * pageSize).limit(pageSize).values(
id="id",
query_count="query_count",
result_count="result_count",
cost_time="cost_time",
operation_time="operation_time",
status="status",
request_params="request_params",
response_result="response_result",
create_time="create_time",
update_time="update_time",
operator_id="operator__id",
operator_name="operator__username",
operator_nickname="operator__nickname",
department_id="operator__department__id",
department_name="operator__department__name",
)
else: else:
count = await QueryCodeLog.filter(operator_id=user_id, del_flag=1).count() filterArgs['operator__department__id'] = department_id
data = await QueryCodeLog.filter(operator_id=user_id, del_flag=1).order_by("-operation_time").offset( count = await QueryCodeLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).count()
data = await QueryCodeLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).order_by("-operation_time").offset(
(page - 1) * pageSize).limit(pageSize).values( (page - 1) * pageSize).limit(pageSize).values(
id="id", id="id",
query_count="query_count", query_count="query_count",
@ -489,7 +499,6 @@ async def get_code_log_list(request: Request,
department_id="operator__department__id", department_id="operator__department__id",
department_name="operator__department__name", department_name="operator__department__name",
) )
return Response.success(data={ return Response.success(data={
"page": page, "page": page,
"pageSize": pageSize, "pageSize": pageSize,
@ -500,41 +509,35 @@ async def get_code_log_list(request: Request,
@codeAPI.get("/logList/all", response_class=JSONResponse, response_model=GetCodeLogAllResponse, @codeAPI.get("/logList/all", response_class=JSONResponse, response_model=GetCodeLogAllResponse,
summary="查询所有编码日志列表") summary="查询所有编码日志列表")
@Log(title="查询所有编码日志列表", business_type=BusinessType.SELECT) @Log(title="查询所有编码日志列表", business_type=BusinessType.EXPORT)
@Auth(permission_list=["code:btn:export"])
async def get_code_log_list(request: Request, async def get_code_log_list(request: Request,
department_id: Optional[str] = Query(default=None, description="部门ID"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
startTime: Optional[str] = Query(default=None, description="开始时间"), startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"), endTime: Optional[str] = Query(default=None, description="结束时间"),
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
): ):
user_id = current_user.get("id") sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'operator__username': username,
'operator__nickname': nickname,
}.items() if v
}
if startTime and endTime: if startTime and endTime:
startTime = float(startTime) / 1000 startTime = float(startTime) / 1000
endTime = float(endTime) / 1000 endTime = float(endTime) / 1000
startTime = datetime.fromtimestamp(startTime) startTime = datetime.fromtimestamp(startTime)
endTime = datetime.fromtimestamp(endTime) endTime = datetime.fromtimestamp(endTime)
count = await QueryCodeLog.filter(operator_id=user_id, del_flag=1, operation_time__gte=startTime, filterArgs['operation_time__range'] = [startTime, endTime]
operation_time__lte=endTime).count() if not department_id:
data = await QueryCodeLog.filter(operator_id=user_id, del_flag=1, operation_time__gte=startTime, filterArgs['operator__department__id__in'] = sub_departments
operation_time__lte=endTime).order_by("-operation_time").values(
id="id",
query_count="query_count",
result_count="result_count",
cost_time="cost_time",
operation_time="operation_time",
status="status",
request_params="request_params",
response_result="response_result",
create_time="create_time",
update_time="update_time",
operator_id="operator__id",
operator_name="operator__username",
operator_nickname="operator__nickname",
department_id="operator__department__id",
department_name="operator__department__name",
)
else: else:
count = await QueryCodeLog.filter(operator_id=user_id, del_flag=1).count() filterArgs['operator__department__id'] = department_id
data = await QueryCodeLog.filter(operator_id=user_id, del_flag=1).order_by("-operation_time").values( count = await QueryCodeLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).count()
data = await QueryCodeLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).order_by("-operation_time").values(
id="id", id="id",
query_count="query_count", query_count="query_count",
result_count="result_count", result_count="result_count",
@ -561,10 +564,13 @@ async def get_code_log_list(request: Request,
@codeAPI.get("/logInfo/{id}", response_class=JSONResponse, response_model=GetQueryCodeLogDetailResponse, @codeAPI.get("/logInfo/{id}", response_class=JSONResponse, response_model=GetQueryCodeLogDetailResponse,
summary="查询编码日志详情") summary="查询编码日志详情")
@Log(title="查询编码日志详情", business_type=BusinessType.SELECT) @Log(title="查询编码日志详情", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:logInfo"])
async def get_code_log_detail(request: Request, async def get_code_log_detail(request: Request,
id: str = Path(..., description="日志ID"), id: str = Path(..., description="日志ID"),
current_user: dict = Depends(LoginController.get_current_user),
): ):
if log := await QueryCodeLog.get_or_none(id=id): sub_departments = current_user.get("sub_departments")
if log := await QueryCodeLog.get_or_none(id=id, operator__department__id__in=sub_departments, del_flag=1):
data = await log.first().values( data = await log.first().values(
id="id", id="id",
query_count="query_count", query_count="query_count",

View File

@ -6,12 +6,12 @@
# @Software : PyCharm # @Software : PyCharm
# @Comment : 本程序 # @Comment : 本程序
from datetime import datetime from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel from pydantic.alias_generators import to_camel
from schemas.common import BaseResponse, ListQueryResult from schemas.common import BaseResponse, ListQueryResult
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Optional
class CodeInfo(BaseModel): class CodeInfo(BaseModel):
@ -42,17 +42,6 @@ class AddCodeParams(BaseModel):
) )
class DeleteCodeListParams(BaseModel):
"""
删除编码参数
"""
ids: List[str] = Field(..., description="删除ID列表")
model_config = ConfigDict(
alias_generator=to_camel
)
class GetCodeInfoResponse(BaseResponse): class GetCodeInfoResponse(BaseResponse):
""" """
获取编码信息响应 获取编码信息响应