feat: 添加数据反馈,数据导入功能

This commit is contained in:
皓月归尘 2025-02-27 23:22:19 +08:00
parent 5c55f4acde
commit d6fcd38cd5
5 changed files with 609 additions and 55 deletions

View File

@ -22,10 +22,11 @@ from config.constant import BusinessType
from config.env import ElasticSearchConfig
from controller.login import LoginController
from exceptions.exception import ServiceException, PermissionException
from models import File, Code, QueryCode, QueryCodeLog
from models import File, Code, QueryCode, QueryCodeLog, CodeFeedback, CodeImport
from schemas.code import GetCodeInfoResponse, GetCodeListResponse, GetQueryCodeParams, QueryCodeResponse, AddCodeParams, \
GetQueryCodeLogResponse, GetQueryCodeLogDetailResponse, \
GetCodeLogAllResponse
GetCodeLogAllResponse, AddCodeFeedbackParams, GetCodeFeedbackResponse, GetCodeFeedbackListResponse, \
UpdateCodeFeedbackStatusParams, GetCodeImportListResponse, UpdateCodeImportStatusParams
from schemas.common import BaseResponse, DeleteListParams
from utils.log import logger
from utils.response import Response
@ -70,29 +71,17 @@ async def add_code(request: Request, params: AddCodeParams, current_user=Depends
params.code = params.code.replace(".", "").replace("/", "").replace("_", "").replace("-", "").replace("?",
"").replace(
":", "").replace("", "").replace("", "").strip()
if await Code.get_or_none(code=params.code, del_flag=1):
if await CodeImport.get_or_none(code=params.code, del_flag=1):
return Response.failure(msg="编码已存在")
else:
if await request.app.state.es.indices.exists(index=ElasticSearchConfig.ES_INDEX):
await request.app.state.es.indices.create(index=ElasticSearchConfig.ES_INDEX, ignore=400)
code = await Code.create(
user_id = current_user.get("id")
code_import = await CodeImport.create(
code=params.code,
description=params.description
description=params.description,
status=3,
user_id=user_id
)
if code:
# 构造 Bulk 导入数据
actions = [
{
"_index": ElasticSearchConfig.ES_INDEX,
"_id": code.code, # 以 code 作为 ID
"_source": {
"code": code.code,
"description": code.description
}
}
]
success, failed = await async_bulk(request.app.state.es, actions)
logger.info(f"成功导入 {success} 条数据,失败 {failed}")
if code_import:
return Response.success(msg="添加成功")
else:
return Response.failure(msg="添加失败")
@ -116,26 +105,12 @@ async def add_code_by_file(request: Request, id: str = Path(description="文件I
"?",
"").replace(
":", "").replace("", "").replace("", "").strip()
await Code.create(
await CodeImport.create(
code=row["code"],
description=row["description"]
description=row["description"],
status=3,
user_id=user_id
)
if not await request.app.state.es.indices.exists(index=ElasticSearchConfig.ES_INDEX):
await request.app.state.es.indices.create(index=ElasticSearchConfig.ES_INDEX, ignore=400)
# 构造 Bulk 导入数据
actions = [
{
"_index": ElasticSearchConfig.ES_INDEX,
"_id": row["code"], # 以 code 作为 ID
"_source": {
"code": row["code"],
"description": row["description"]
}
}
for _, row in df.iterrows()
]
success, failed = await async_bulk(request.app.state.es, actions)
logger.info(f"成功导入 {success} 条数据,失败 {failed}")
except ServiceException as e:
logger.error(e.message)
raise ServiceException(message="文件读取失败")
@ -155,7 +130,7 @@ async def delete_code_by_id(request: Request, id: str = Path(description="编码
if code := await Code.get_or_none(id=id, del_flag=1):
code.del_flag = 0
await code.save()
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code)
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.id)
return Response.success(msg="删除成功")
else:
return Response.failure(msg="编码不存在")
@ -171,7 +146,7 @@ async def delete_code_by_ids(request: Request, params: DeleteListParams,
if code := await Code.get_or_none(id=id, del_flag=1):
code.del_flag = 0
await code.save()
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.code)
await request.app.state.es.delete(index=ElasticSearchConfig.ES_INDEX, id=code.id)
return Response.success(msg="删除成功")
@ -187,8 +162,9 @@ async def update_code(request: Request,
code.code = params.code
code.description = params.description
await code.save()
await request.app.state.es.update(index=ElasticSearchConfig.ES_INDEX, id=code.code,
body={"doc": {"description": params.description}})
await request.app.state.es.update(index=ElasticSearchConfig.ES_INDEX, id=code.id,
body={"doc": {"id": code.id, "code": code.code,
"description": params.description}})
return Response.success(msg="更新成功")
return Response.failure(msg="编码不存在")
@ -205,7 +181,12 @@ async def get_code_info(request: Request, id: str = Path(description="编码ID")
create_time="create_time",
create_by="create_by",
update_time="update_time",
update_by="update_by"
update_by="update_by",
user_id="user__id",
username="user__username",
nickname="user__nickname",
department_id="user__department__id",
department_name="user__department__name",
):
return Response.success(data=code)
return Response.failure(msg="编码不存在")
@ -219,13 +200,28 @@ async def get_code_list(request: Request,
pageSize: int = Query(default=10, description="每页数量"),
code: Optional[str] = Query(default=None, description="编码"),
description: Optional[str] = Query(default=None, description="商品描述"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
current_user=Depends(LoginController.get_current_user)):
filterArgs = {
f'{k}__contains': v for k, v in {
'user__username': username,
'user__nickname': nickname,
'code': code,
'description': description
'description': description,
}.items() if v
}
if startTime and endTime:
startTime = float(startTime) / 1000
endTime = float(endTime) / 1000
startTime = datetime.fromtimestamp(startTime)
endTime = datetime.fromtimestamp(endTime)
filterArgs['create_time__range'] = [startTime, endTime]
if department_id:
filterArgs['user__department__id'] = department_id
total = await Code.filter(**filterArgs, del_flag=1).count()
data = await Code.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
@ -234,7 +230,12 @@ async def get_code_list(request: Request,
create_time="create_time",
create_by="create_by",
update_time="update_time",
update_by="update_by"
update_by="update_by",
user_id="user__id",
username="user__username",
nickname="user__nickname",
department_id="user__department__id",
department_name="user__department__name",
)
return Response.success(data={
"page": page,
@ -294,7 +295,7 @@ async def get_code_list(request: Request,
max_score = data["hits"].get("max_score", 1)
# 处理每一条匹配结果
for hit in data["hits"]["hits"]:
code = await Code.get_or_none(code=hit["_source"]["code"], del_flag=1)
code = await Code.get_or_none(id=hit["_source"]["id"], del_flag=1)
# 归一化匹配度,转换为百分比
match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比
# 将匹配结果添加到列表中
@ -396,7 +397,7 @@ async def get_code_list(request: Request,
max_score = data["hits"].get("max_score", 1)
# 处理每一条匹配结果
for hit in data["hits"]["hits"]:
code = await Code.get_or_none(code=hit["_source"]["code"], del_flag=1)
code = await Code.get_or_none(id=hit["_source"]["id"], del_flag=1)
# 归一化匹配度,转换为百分比
match_rate = round((hit["_score"] / max_score) * 100, 2) # 归一化后计算百分比
# 将匹配结果添加到列表中
@ -598,3 +599,367 @@ async def get_code_log_detail(request: Request,
)
return Response.success(data=data)
return Response.failure(msg="日志不存在!")
@codeAPI.delete("/deleteFeedback/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除编码反馈")
@codeAPI.post("/deleteFeedback/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码反馈")
@Log(title="删除编码反馈", business_type=BusinessType.DELETE)
@Auth(permission_list=["code:btn:deleteFeedback"])
async def delete_feedback(request: Request,
id: str = Path(..., description="编码反馈ID"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
if feedback := await CodeFeedback.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
feedback.del_flag = 0
await feedback.save()
return Response.success(msg="删除成功!")
return Response.failure(msg="删除失败!")
@codeAPI.post("/addFeedback", response_class=JSONResponse, response_model=BaseResponse, summary="添加编码反馈")
@Log(title="添加编码反馈", business_type=BusinessType.INSERT)
@Auth(permission_list=["code:btn:addFeedback"])
async def add_feedback(request: Request,
params: AddCodeFeedbackParams,
current_user: dict = Depends(LoginController.get_current_user),
):
user_id = current_user.get("id")
if code := await Code.get_or_none(id=params.code_id, del_flag=1):
feedback = await CodeFeedback.create(
code_id=code.id,
feedback_code=params.feedback_code,
feedback_description=params.feedback_description,
user_id=str(user_id),
)
if feedback:
return Response.success(msg="添加成功!")
return Response.failure(msg="添加失败!")
return Response.failure(msg="编码不存在!")
@codeAPI.delete("/deleteFeedbackList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码反馈")
@codeAPI.post("/deleteFeedbackList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码反馈")
@Log(title="批量删除编码反馈", business_type=BusinessType.DELETE)
@Auth(permission_list=["code:btn:deleteFeedback"])
async def delete_feedback_list(request: Request,
params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids):
if feedback := await CodeFeedback.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
feedback.del_flag = 0
await feedback.save()
return Response.success(msg="删除成功!")
@codeAPI.put("/updateFeedback/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码反馈")
@codeAPI.post("/updateFeedback/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码反馈")
@Log(title="修改编码反馈", business_type=BusinessType.UPDATE)
@Auth(permission_list=["code:btn:updateFeedback"])
async def update_feedback(request: Request,
params: AddCodeFeedbackParams,
id: str = Path(..., description="编码反馈ID"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
if feedback := await CodeFeedback.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
code = await Code.get_or_none(id=params.code_id, del_flag=1)
if code:
feedback.code = code
else:
return Response.failure(msg="编码不存在!")
feedback.feedback_description = params.feedback_description
feedback.feedback_code = params.feedback_code
await feedback.save()
return Response.success(msg="修改成功!")
return Response.failure(msg="编码反馈不存在!")
@codeAPI.get("/feedbackInfo/{id}", response_class=JSONResponse, response_model=GetCodeFeedbackResponse,
summary="编码反馈详情")
@Log(title="编码反馈详情", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:feedbackInfo"])
async def feedback_info(request: Request,
id: str = Path(..., description="编码反馈ID"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
if feedback := await CodeFeedback.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
data = await feedback.first().values(
id="id",
code_id="code__id",
code="code__code",
description="code__description",
feedback_code="feedback_code",
feedback_description="feedback_description",
create_time="create_time",
update_time="update_time",
user_id="user__id",
username="user__username",
nickname="user__nickname",
department_id="user__department__id",
department_name="user__department__name",
)
return Response.success(data=data)
return Response.failure(msg="编码反馈不存在!")
@codeAPI.get("/feedbackList", response_class=JSONResponse, response_model=GetCodeFeedbackListResponse,
summary="编码反馈列表")
@Log(title="编码反馈列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["code:btn:feedbackList"])
async def feedback_list(request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
code: Optional[str] = Query(default=None, description="编码"),
feedback_code: Optional[str] = Query(default=None, description="反馈编码"),
feedback_description: Optional[str] = Query(default=None, description="反馈文本"),
username: Optional[str] = Query(default=None, description="用户名"),
status: Optional[str] = Query(default=None, description="状态"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'user__username': username,
'user__nickname': nickname,
'code__code': code,
'feedback_code': feedback_code,
'feedback_description': feedback_description,
}.items() if v
}
if startTime and endTime:
startTime = float(startTime) / 1000
endTime = float(endTime) / 1000
startTime = datetime.fromtimestamp(startTime)
endTime = datetime.fromtimestamp(endTime)
filterArgs['create_time__range'] = [startTime, endTime]
if await hasAuth(request, "code:btn:feedbackAdmin"):
if department_id:
filterArgs['user__department__id'] = department_id
else:
filterArgs['user__department__id__in'] = sub_departments
else:
if department_id:
filterArgs['user__department__id'] = department_id
if status is not None:
filterArgs['status'] = int(status)
total = await CodeFeedback.filter(**filterArgs, del_flag=1).count()
data = await CodeFeedback.filter(**filterArgs, del_flag=1).order_by('-create_time').offset(
(page - 1) * pageSize).limit(pageSize).values(
id="id",
code_id="code__id",
code="code__code",
description="code__description",
feedback_code="feedback_code",
feedback_description="feedback_description",
create_time="create_time",
update_time="update_time",
user_id="user__id",
username="user__username",
nickname="user__nickname",
department_id="user__department__id",
department_name="user__department__name",
status="status",
)
return Response.success(data={
"page": page,
"pageSize": pageSize,
"result": data,
"total": total,
})
@codeAPI.put("/feedbackAudit", response_class=JSONResponse, response_model=BaseResponse, summary="编码反馈审核")
@codeAPI.post("/feedbackAudit", response_class=JSONResponse, response_model=BaseResponse, summary="编码反馈审核")
@Log(title="编码反馈审核", business_type=BusinessType.UPDATE)
@Auth(permission_list=["code:btn:feedbackAudit"])
async def feedback_audit(request: Request,
params: UpdateCodeFeedbackStatusParams,
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids):
if feedback := await CodeFeedback.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
feedback.status = params.status
if params.status == 1:
await Code.filter(id=feedback.code_id, del_flag=1).update(code=feedback.feedback_code,
description=feedback.feedback_description)
await request.app.state.es.update(index=ElasticSearchConfig.ES_INDEX, id=feedback.code_id,
body={"doc": {"id": feedback.code_id, "code": feedback.feedback_code,
"description": feedback.feedback_description}})
await feedback.save()
return Response.failure(msg="编码反馈不存在!")
@codeAPI.delete("/deleteCodeImport/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除编码导入")
@codeAPI.post("/deleteCodeImport/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除编码导入")
@Log(title="删除编码导入", business_type=BusinessType.DELETE)
@Auth(permission_list=["code:btn:deleteCodeImport"])
async def delete_code_import(request: Request, id: str = Path(description="编码导入ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if code_import := await CodeImport.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
code_import.del_flag = 0
await code_import.save()
return Response.success()
return Response.failure(msg="编码导入不存在!")
@codeAPI.delete("/deleteCodeImportList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码导入")
@codeAPI.post("/deleteCodeImportList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码导入")
@Log(title="批量删除编码导入", business_type=BusinessType.DELETE)
@Auth(permission_list=["code:btn:deleteCodeImport"])
async def delete_code_import_list(request: Request, params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids):
if code_import := await CodeImport.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
code_import.del_flag = 0
await code_import.save()
return Response.success()
@codeAPI.put("/updateCodeImport/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码导入")
@codeAPI.post("/updateCodeImport/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="修改编码导入")
@Log(title="修改编码导入", business_type=BusinessType.UPDATE)
@Auth(permission_list=["code:btn:updateCodeImport"])
async def update_code_import(request: Request,
params: AddCodeParams,
id: str = Path(description="编码导入ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if code_import := await CodeImport.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
code_import.code = params.code
code_import.description = params.description
code_import.status = 3
await code_import.save()
return Response.success()
return Response.failure(msg="编码导入不存在!")
@codeAPI.get("/codeImportList", response_class=JSONResponse, response_model=GetCodeImportListResponse,
summary="查询编码导入列表")
@Auth(permission_list=["code:btn:codeImportList"])
async def get_code_import_list(
request: Request,
page: int = Query(default=1, description="页码"),
pageSize: int = Query(default=10, description="每页数量"),
code: Optional[str] = Query(default=None, description="编码"),
description: Optional[str] = Query(default=None, description="商品描述"),
username: Optional[str] = Query(default=None, description="用户名"),
status: Optional[str] = Query(default=None, description="状态"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'user__username': username,
'user__nickname': nickname,
'code': code,
'description': description,
}.items() if v
}
if startTime and endTime:
startTime = float(startTime) / 1000
endTime = float(endTime) / 1000
startTime = datetime.fromtimestamp(startTime)
endTime = datetime.fromtimestamp(endTime)
filterArgs['create_time__range'] = [startTime, endTime]
if await hasAuth(request, "code:btn:codeImportAdmin"):
if department_id:
filterArgs['user__department__id'] = department_id
else:
filterArgs['user__department__id__in'] = sub_departments
else:
if department_id:
filterArgs['user__department__id'] = department_id
if status is not None:
filterArgs['status'] = int(status)
total = await CodeImport.filter(**filterArgs, del_flag=1).count()
data = await CodeImport.filter(**filterArgs, del_flag=1).order_by('-create_time').offset(
(page - 1) * pageSize).limit(pageSize).values(
id="id",
code="code",
description="description",
create_time="create_time",
update_time="update_time",
user_id="user__id",
username="user__username",
nickname="user__nickname",
department_id="user__department__id",
department_name="user__department__name",
status="status",
)
return Response.success(data={
"page": page,
"pageSize": pageSize,
"result": data,
"total": total,
})
@codeAPI.put("/codeImportAudit", response_class=JSONResponse, response_model=BaseResponse, summary="编码导入审核")
@codeAPI.post("/codeImportAudit", response_class=JSONResponse, response_model=BaseResponse, summary="编码导入审核")
@Log(title="编码导入审核", business_type=BusinessType.INSERT)
@Auth(permission_list=["code:btn:codeImportAudit"])
async def code_import_audit(request: Request, params: UpdateCodeImportStatusParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
actions = []
for id in set(params.ids):
if codeImport := await CodeImport.get_or_none(id=id, user__department__id__in=sub_departments, del_flag=1):
codeImport.status = params.status
if params.status == 1:
codeImport.status = 1
code = codeImport.code.replace(".", "").replace("/", "").replace("_", "").replace("-",
"").replace(
"?",
"").replace(
":", "").replace("", "").replace("", "").strip()
user_id = current_user.get("id")
codeInfo = await Code.create(
code=code,
description=codeImport.description,
user_id=user_id,
)
if codeInfo:
# 构造 Bulk 导入数据
actions.append(
{
"_index": ElasticSearchConfig.ES_INDEX,
"_id": codeInfo.id, # 以 code 作为 ID
"_source": {
"id": codeInfo.id,
"code": codeInfo.code,
"description": codeInfo.description
}
}
)
await codeImport.save()
if await request.app.state.es.indices.exists(index=ElasticSearchConfig.ES_INDEX):
await request.app.state.es.indices.create(index=ElasticSearchConfig.ES_INDEX, ignore=400)
success, failed = await async_bulk(request.app.state.es, actions)
logger.info(f"成功导入 {success} 条数据,失败 {failed}")
return Response.success()

View File

@ -6,7 +6,7 @@
# @Software : PyCharm
# @Comment : 本程序
from models.code import Code, QueryCode, QueryCodeLog
from models.code import Code, QueryCode, QueryCodeLog, CodeFeedback, CodeImport
from models.config import Config
from models.department import Department
from models.file import File
@ -30,6 +30,8 @@ __all__ = [
'Locale',
'Config',
'Code',
'CodeFeedback',
'CodeImport',
'QueryCode',
'QueryCodeLog'
]

View File

@ -24,6 +24,12 @@ class Code(BaseModel):
description="描述",
source_field="description"
)
user = fields.ForeignKeyField(
"models.User",
related_name="codes",
description="用户",
source_field="user_id"
)
class Meta:
table = "code"
@ -160,3 +166,65 @@ class QueryCode(BaseModel):
class Meta:
table = "query_code"
table_description = "查询编码表"
class CodeFeedback(BaseModel):
"""
编码反馈模型
"""
code = fields.ForeignKeyField(
"models.Code",
related_name="feedbacks",
description="编码",
source_field="code_id"
)
user = fields.ForeignKeyField(
"models.User",
related_name="feedbacks",
description="用户",
source_field="user_id"
)
feedback_code = fields.CharField(
max_length=255,
description="反馈编码",
source_field="feedback_code"
)
feedback_description = fields.TextField(
description="反馈内容",
source_field="feedback_description"
)
status = fields.SmallIntField(
default=3,
description="状态0删除1审核通过2审核不通过3待审核",
source_field="status"
)
class Meta:
table = "code_feedback"
table_description = "编码反馈表"
class CodeImport(BaseModel):
"""
编码导入模型
"""
code = fields.CharField(
max_length=255,
description="编码",
source_field="code"
)
description = fields.TextField(
description="描述",
source_field="description"
)
user = fields.ForeignKeyField(
"models.User",
related_name="imports",
description="用户",
source_field="user_id"
)
status = fields.SmallIntField(
default=1,
description="状态0删除1审核通过2审核不通过3待审核",
source_field="status"
)

View File

@ -28,6 +28,11 @@ class CodeInfo(BaseModel):
update_time: Optional[datetime] = Field(default=None, description="更新时间")
code: str = Field(..., description="编码")
description: str = Field(..., description="描述")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
class AddCodeParams(BaseModel):
@ -176,3 +181,112 @@ class GetCodeLogAllResponse(BaseResponse):
查询所有编码日志响应
"""
data: GetCodeLogAllResult = Field(..., description="查询编码日志结果")
class CodeFeedbackInfo(BaseModel):
"""
编码反馈信息
"""
model_config = ConfigDict(
alias_generator=to_camel
)
id: str = Field(default="", description="主键")
code_id: str = Field(default="", description="编码ID")
code: str = Field(default="", description="编码")
description: str = Field(default="", description="描述")
feedback_code: str = Field(default="", description="反馈编码")
feedback_description: str = Field(default="", description="反馈文本")
status: int = Field(default=3, description="反馈状态")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetCodeFeedbackResponse(BaseResponse):
"""
获取编码结果
"""
data: CodeFeedbackInfo = Field(default={}, description="编码反馈结果")
class AddCodeFeedbackParams(BaseModel):
"""
添加编码反馈参数
"""
code_id: str = Field(..., description="编码ID")
feedback_code: str = Field(..., description="反馈编码")
feedback_description: str = Field(..., description="反馈文本")
class CodeFeedbackResult(ListQueryResult):
"""
编码反馈结果
"""
result: List[CodeFeedbackInfo] = Field(..., description="编码反馈列表")
class GetCodeFeedbackListResponse(BaseResponse):
"""
获取编码反馈响应
"""
data: CodeFeedbackResult = Field(..., description="编码反馈结果")
class UpdateCodeFeedbackStatusParams(BaseModel):
"""
更新编码反馈参数
"""
status: int = Field(..., description="状态")
ids: List[str] = Field(default=[], description="编码反馈ID")
class CodeImportInfo(BaseModel):
"""
编码导入信息
"""
model_config = ConfigDict(
alias_generator=to_camel
)
code: str = Field(default="", description="编码")
description: str = Field(default="", description="描述")
status: int = Field(default=3, description="反馈状态")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetCodeImportResponse(BaseResponse):
"""
获取编码导入响应
"""
data: CodeImportInfo = Field(default={}, description="编码导入结果")
class CodeImportResult(ListQueryResult):
"""
编码导入结果
"""
result: List[CodeImportInfo] = Field(..., description="编码导入列表")
class GetCodeImportListResponse(BaseResponse):
"""
获取编码导入响应
"""
data: CodeImportResult = Field(..., description="编码导入结果")
class UpdateCodeImportStatusParams(BaseModel):
"""
更新编码反馈参数
"""
status: int = Field(..., description="状态")
ids: List[str] = Field(default=[], description="编码反馈ID")

View File

@ -5,6 +5,8 @@
# @File : common.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List, Any, Optional, Type
def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See:
@ -26,11 +28,14 @@ def bytes2human(n, format_str='%(value).1f%(symbol)s'):
return format_str % dict(symbol=symbols[0], value=n)
async def filterKeyValues(dataList: list, key: str) -> list:
async def filterKeyValues(dataList: List[dict], key: str, default: Any = None, convert_type: Optional[Type] = None) -> List[Any]:
"""
获取列表字段数据
:param dataList: 数据列表
:param key: 关键字
:return:
获取列表字段数据并可选择进行类型转换
:param dataList: 数据列表列表中的元素是字典
:param key: 要提取的字段
:param default: 如果字段不存在返回的默认值
:param convert_type: 需要转换的类型 intstrfloat 默认为 None 不转换
:return: 提取并转换后的值列表
"""
return [item[key] for item in dataList]
return [convert_type(item.get(key, default)) if convert_type else item.get(key, default) for item in dataList]