Compare commits

...

10 Commits

31 changed files with 3295 additions and 415 deletions

171
.env.dev Normal file
View File

@ -0,0 +1,171 @@
# -------- 应用配置 --------
# 应用运行环境
APP_ENV = 'dev'
# 应用名称
APP_NAME = 'Cutoms-System'
# 应用代理路径
APP_ROOT_PATH = ''
# 应用主机
APP_HOST = '0.0.0.0'
# 应用端口
APP_PORT = 8082
# 应用版本
APP_VERSION= '1.0.0'
# 应用是否开启热重载
APP_RELOAD = true
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = false
# 应用是否允许账号同时登录
APP_SAME_TIME_LOGIN = true
# -------JWT配置------------
# JWT 签名密钥
JWT_SECRET_KEY=b01c66dc2c58dc6a0aabfe2144256be36226de378bf87f72c0c795dda67f4d55
# JWT 签名算法
JWT_ALGORITHM=HS256
# JWT 盐值
JWT_SALT=jwt_salt
# JWT 令牌有效期(分钟)
JWT_EXPIRE_MINUTES=1440
# JWT 令牌在 Redis 中的缓存有效期(分钟)
JWT_REDIS_EXPIRE_MINUTES=30
# -------- 数据库配置 --------
# 数据库类型,默认为'mysql'
DB_TYPE = 'mysql'
# 数据库主机
DB_HOST = '127.0.0.1'
# 数据库端口
DB_PORT = 3306
# 数据库用户名
DB_USERNAME = 'root'
# 数据库密码
DB_PASSWORD = 'mysqlroot'
# 数据库名称
DB_DATABASE = 'cutoms_system'
# 是否开启日志
DB_ECHO = true
# 数据库日志级别,默认为 10DEBUG
DB_LOG_LEVEL = 10
# 允许溢出连接池大小的最大连接数
DB_MAX_OVERFLOW = 10
# 连接池大小0表示连接数无限制
DB_POOL_SIZE = 50
# 连接回收时间(单位:秒)
DB_POOL_RECYCLE = 3600
# 连接池中没有线程可用时,最多等待的时间(单位:秒)
DB_POOL_TIMEOUT = 30
# -------- Redis配置 --------
# Redis主机
REDIS_HOST = '127.0.0.1'
# Redis端口
REDIS_PORT = 6379
# Redis用户名
REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
REDIS_DATABASE = 2
# ======================
# 上传配置
# ======================
# 文件上传的 URL 前缀,默认为 '/profile'。
# 例如:`/profile/example.jpg`。
UPLOAD_PREFIX=/profile
# 文件上传的存储路径,默认为 'data/upload_path'。
# 上传的文件将存储在此目录中,如果目录不存在,会自动创建。
UPLOAD_PATH=data/upload_path
# 上传机器的标识,默认为 'A'。
# 用于区分不同的上传机器或节点,在多机部署时可以使用此字段。
UPLOAD_MACHINE=A
# 默认允许上传的文件扩展名列表,使用逗号分隔。
# 包含常见的图片、文档、压缩文件、视频和 PDF 格式。
# 可以根据需求扩展或修改此列表。
DEFAULT_ALLOWED_EXTENSION=bmp,gif,jpg,jpeg,png,doc,docx,xls,xlsx,ppt,pptx,html,htm,txt,rar,zip,gz,bz2,mp4,avi,rmvb,pdf
# 文件下载的存储路径,默认为 'data/download_path'。
# 下载的文件将存储在此目录中,如果目录不存在,会自动创建。
DOWNLOAD_PATH=data/download_path
# ======================
# 邮件配置
# ======================
# 邮件发送者的用户名,默认为空。
EMAIL_USERNAME=
# 邮件发送者的密码,默认为空。
EMAIL_PASSWORD=
# 邮件服务器地址,默认为 "smtp.qq.com"。
# 如果是其他邮件服务商,请修改为对应的 SMTP 服务器地址。
EMIAL_HOST=smtp.qq.com
# 邮件服务器端口,默认为 465。
# 如果是其他邮件服务商,请根据其要求修改端口号。
EMAIL_PORT=587
# ======================
# 地图配置
# ======================
# 百度地图的 AK 密钥,用于获取地图数据。
# 请在百度地图官网申请获取 AK 密钥。
AK=
# 百度地图sk密钥用于获取地图数据。
# 请在百度地图官网申请获取 SK 密钥。
SK=
# ======================
# ElasticSearch配置
# ======================
# ElasticSearch的URL默认为 "http://localhost:9200"。
ES_HOST=localhost
# ElasticSearch的端口默认为 9200。
ES_PORT=9200
# ElasticSearch的索引名称默认为 "cutoms_system"。
ES_INDEX=product_codes
# ElasticSearch的用户名。
ES_USER=elastic
# ElasticSearch的密码。
ES_PASSWORD=changeme

View File

@ -9,6 +9,7 @@ from functools import wraps
from fastapi import Request
from config.constant import RedisKeyConfig
from controller.login import LoginController
from exceptions.exception import PermissionException
@ -40,3 +41,29 @@ class Auth:
raise PermissionException(message="该用户无此接口权限!")
return wrapper
async def hasAuth(request: Request, permission: str) -> bool:
"""
判断是有拥有某项权限
"""
token = request.headers.get('Authorization') # 直接使用 request 对象
current_user = await LoginController.get_current_user(request, token)
permissions = current_user.get('permissions')
if permission in permissions:
return True
else:
return False
async def hasAdmin(request: Request, department_id: str) -> bool:
"""
判断是否有管理员权限
"""
permissions = []
if ids := await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:permission_departments'):
permissions = eval(ids)
if department_id in permissions:
return True
else:
return False

View File

@ -149,7 +149,7 @@ class Log:
# else:
session_id = request.app.state.session_id
status = 1 if request.app.state.login_status else 0
current_user = await User.get_or_none(username=payload.get("username"),del_flag=1)
current_user = await User.get_or_none(username=payload.get("username"), del_flag=1)
await LoginLog.create(
user_id=current_user.id,
login_ip=host,

File diff suppressed because it is too large Load Diff

View File

@ -125,7 +125,7 @@ async def get_config_list(request: Request,
type: Optional[str] = Query(default=None, description="系统内置"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'name': name,
'key': key,
'type': type,

View File

@ -79,8 +79,9 @@ async def delete_department_list(request: Request, params: DeleteListParams,
if department := await Department.get_or_none(id=item, del_flag=1):
if item in sub_departments:
await delete_department_recursive(department_id=department.id)
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:*'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:*')
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="删除成功!")
@ -116,6 +117,9 @@ async def update_department(request: Request, params: AddDepartmentParams, id: s
department.sort = params.sort
department.status = params.status
await department.save()
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="修改成功!")
else:
return Response.error(msg="修改失败,部门不存在!")
@ -167,7 +171,7 @@ async def get_department_list(
current_user: dict = Depends(LoginController.get_current_user)
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'name': name,
'principal': principal,
'phone': phone,

View File

@ -7,7 +7,6 @@
# @Comment : 本程序
import os
from datetime import datetime
from fastapi import APIRouter, UploadFile, File, Path, Depends, Request, Query
from fastapi.responses import FileResponse, JSONResponse
@ -40,7 +39,7 @@ async def upload_file(
raise ModelValidatorException(message="文件类型不支持")
# 2. 生成唯一的文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
unique_filename = f"{current_user.get('id')}_{timestamp}.{file_extension}"
# 3. 保存文件到服务器
@ -61,7 +60,7 @@ async def upload_file(
relative_path=relative_path,
uploader_id=current_user.get("id"),
)
result = await file_record.first().values(
result = await FileModel.get_or_none(id=file_record.id).values(
id="id",
name="name",
size="size",
@ -86,7 +85,7 @@ async def download_file(
id: str = Path(..., description="文件ID"),
):
# 1. 查询文件记录
file_record = await FileModel.get_or_none(id=id)
file_record = await FileModel.get_or_none(id=id, del_flag=1)
if not file_record:
raise ServiceException(message="文件不存在!")
@ -110,10 +109,10 @@ async def get_file_info(
current_user: dict = Depends(LoginController.get_current_user),
):
# 1. 查询文件记录
file_record = await FileModel.get_or_none(id=id)
file_record = await FileModel.get_or_none(id=id, del_flag=1)
if not file_record:
raise ServiceException(message="文件不存在!")
result = await file_record.first().values(
result = await FileModel.get_or_none(id=id, del_flag=1).values(
id="id",
name="name",
size="size",
@ -139,11 +138,11 @@ async def delete_file(
id: str = Path(..., description="文件ID"),
current_user: dict = Depends(LoginController.get_current_user), ):
# 1. 查询文件记录
file_record = await FileModel.get_or_none(id=id)
file_record = await FileModel.get_or_none(id=id, del_flag=1)
if not file_record:
raise ServiceException(message="文件不存在!")
if await Upload.check_file_exists(file_record.absolute_path):
await Upload.delete_file(file_record.absolute_path)
if Upload.check_file_exists(file_record.absolute_path):
Upload.delete_file(file_record.absolute_path)
await file_record.delete()
return Response.success()
@ -164,7 +163,7 @@ async def get_file_list(
current_user: dict = Depends(LoginController.get_current_user), ):
# 1. 查询文件记录
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'name': name,
'file_type': file_type,
'uploader__id': uploader_id,

741
api/hts.py Normal file
View File

@ -0,0 +1,741 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : version.py
# @Comment : 本程序用于生成版本信息增删改查接口
import json
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from models import Version, HtsClass, HtsItem, File
from schemas.common import BaseResponse, DeleteListParams
from schemas.hts import AddHTSClassParams, UpdateHTSClassParams, GetHTSClassInfoResponse, GetHTSClassListResponse, \
ImportHtsItemParams
from schemas.hts import AddHtsItemParams, UpdateHtsItemParams, GetHtsItemInfoResponse, GetHtsItemListResponse
from schemas.hts import AddVersionParams, UpdateVersionParams, GetVersionInfoResponse, GetVersionListResponse
from utils.response import Response
htsAPI = APIRouter(
prefix="/hts",
dependencies=[Depends(LoginController.get_current_user)],
)
@htsAPI.post("/version/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增税率编码版本")
@Log(title="新增税率编码版本", business_type=BusinessType.INSERT)
@Auth(permission_list=["version:btn:add"])
async def add_version(request: Request, params: AddVersionParams):
if await Version.get_or_none(
version=params.version,
date=params.date,
url=params.url,
del_flag=1
):
return Response.error(msg="税率编码版本已存在!")
version = await Version.create(
version=params.version,
date=params.date,
url=params.url,
)
if version:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/version/delete/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除税率编码版本")
@htsAPI.post("/version/delete/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除税率编码版本")
@Log(title="删除税率编码版本", business_type=BusinessType.DELETE)
@Auth(permission_list=["version:btn:delete"])
async def delete_version(request: Request, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
version.del_flag = 0
await version.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="税率编码版本不存在!")
@htsAPI.delete("/version/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除税率编码版本")
@htsAPI.post("/version/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除税率编码版本")
@Log(title="批量删除税率编码版本", business_type=BusinessType.DELETE)
@Auth(permission_list=["version:btn:delete"])
async def delete_version_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if version := await Version.get_or_none(id=id, del_flag=1):
version.del_flag = 0
await version.save()
return Response.success(msg="删除成功")
@htsAPI.put("/version/update/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="修改税率编码版本")
@htsAPI.post("/version/update/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="修改税率编码版本")
@Log(title="修改税率编码版本", business_type=BusinessType.UPDATE)
@Auth(permission_list=["version:btn:update"])
async def update_version(request: Request, params: UpdateVersionParams, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
print(params)
version.version = params.version
version.date = params.date
version.url = params.url
await version.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="税率编码版本不存在")
@htsAPI.get("/version/info/{id}", response_class=JSONResponse, response_model=GetVersionInfoResponse,
summary="获取税率编码版本信息")
@Log(title="获取税率编码版本信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["version:btn:info"])
async def get_version_info(request: Request, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
data = {
"id": version.id,
"create_time": version.create_time,
"update_time": version.update_time,
"version": version.version,
"date": version.date,
"url": version.url,
}
return Response.success(data=data)
else:
return Response.error(msg="税率编码版本不存在")
@htsAPI.get("/version/list", response_class=JSONResponse, response_model=GetVersionListResponse,
summary="获取税率编码版本列表")
@Log(title="获取税率编码版本列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["version:btn:list"])
async def get_version_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
version: Optional[str] = Query(default=None, description="版本号"),
date: Optional[str] = Query(default=None, description="版本日期"),
url: Optional[str] = Query(default=None, description="下载地址"),
):
filterArgs = {
"version__icontains": version,
"date__icontains": date,
"url__icontains": url,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await Version.filter(**filterArgs, del_flag=1).count()
data = await Version.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
version="version",
date="date",
url="url",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/class/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增编码类别")
@Log(title="新增编码类别", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsclass:btn:add"])
async def add_htsclass(request: Request, params: AddHTSClassParams):
if await HtsClass.get_or_none(
class_name=params.class_name,
class_description=params.class_description,
chapter_name=params.chapter_name,
chapter_description=params.chapter_description,
del_flag=1
):
return Response.error(msg="编码类别已存在!")
htsclass = await HtsClass.create(
class_name=params.class_name,
class_description=params.class_description,
chapter_name=params.chapter_name,
chapter_description=params.chapter_description,
)
if htsclass:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/class/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码类别")
@htsAPI.post("/class/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码类别")
@Log(title="删除编码类别", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsclass:btn:delete"])
async def delete_htsclass(request: Request, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.del_flag = 0
await htsclass.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="编码类别不存在!")
@htsAPI.delete("/class/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码类别")
@htsAPI.post("/class/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码类别")
@Log(title="批量删除编码类别", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsclass:btn:delete"])
async def delete_htsclass_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.del_flag = 0
await htsclass.save()
return Response.success(msg="删除成功")
@htsAPI.put("/class/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码类别")
@htsAPI.post("/class/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码类别")
@Log(title="修改编码类别", business_type=BusinessType.UPDATE)
@Auth(permission_list=["htsclass:btn:update"])
async def update_htsclass(request: Request, params: UpdateHTSClassParams, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.class_name = params.class_name
htsclass.class_description = params.class_description
htsclass.chapter_name = params.chapter_name
htsclass.chapter_description = params.chapter_description
await htsclass.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="编码类别不存在")
@htsAPI.get("/class/info/{id}", response_class=JSONResponse, response_model=GetHTSClassInfoResponse,
summary="获取编码类别信息")
@Log(title="获取编码类别信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsclass:btn:info"])
async def get_htsclass_info(request: Request, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
data = {
"id": htsclass.id,
"create_time": htsclass.create_time,
"update_time": htsclass.update_time,
"class_name": htsclass.class_name,
"class_description": htsclass.class_description,
"chapter_name": htsclass.chapter_name,
"chapter_description": htsclass.chapter_description,
}
return Response.success(data=data)
else:
return Response.error(msg="编码类别不存在")
@htsAPI.get("/class/list", response_class=JSONResponse, response_model=GetHTSClassListResponse,
summary="获取编码类别列表")
@Log(title="获取编码类别列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsclass:btn:list"])
async def get_htsclass_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
class_name: Optional[str] = Query(default=None, description="类名"),
class_description: Optional[str] = Query(default=None, description="类描述"),
chapter_name: Optional[str] = Query(default=None, description="章节"),
chapter_description: Optional[str] = Query(default=None, description="章节描述"),
):
filterArgs = {
"class_name__icontains": class_name,
"class_description__icontains": class_description,
"chapter_name__icontains": chapter_name,
"chapter_description__icontains": chapter_description,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await HtsClass.filter(**filterArgs, del_flag=1).count()
data = await HtsClass.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
class_name="class_name",
class_description="class_description",
chapter_name="chapter_name",
chapter_description="chapter_description",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/item/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增编码项")
@Log(title="新增编码项", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsitem:btn:add"])
async def add_htsitem(request: Request, params: AddHtsItemParams):
if await HtsItem.get_or_none(
parent_id=params.parent_id,
htsno=params.htsno,
indent=params.indent,
description=params.description,
units=params.units,
general=params.general,
special=params.special,
other=params.other,
quota_quantity=params.quota_quantity,
additional_duties=params.additional_duties,
footnotes=params.footnotes,
class_id=params.class_id,
version_id=params.version_id,
del_flag=1
):
return Response.error(msg="编码项目已存在!")
htsitem = await HtsItem.create(
parent_id=params.parent_id,
htsno=params.htsno,
indent=params.indent,
description=params.description,
units=params.units,
general=params.general,
special=params.special,
other=params.other,
quota_quantity=params.quota_quantity,
additional_duties=params.additional_duties,
footnotes=params.footnotes,
class_id=params.class_id,
version_id=params.version_id,
)
if htsitem:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/item/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码项")
@htsAPI.post("/item/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码项")
@Log(title="删除编码项", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsitem:btn:delete"])
async def delete_htsitem(request: Request, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.del_flag = 0
await htsitem.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="编码项目不存在!")
@htsAPI.delete("/item/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码项")
@htsAPI.post("/item/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码项")
@Log(title="批量删除编码项", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsitem:btn:delete"])
async def delete_htsitem_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.del_flag = 0
await htsitem.save()
return Response.success(msg="删除成功")
@htsAPI.put("/item/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码项")
@htsAPI.post("/item/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码项")
@Log(title="修改编码项", business_type=BusinessType.UPDATE)
@Auth(permission_list=["htsitem:btn:update"])
async def update_htsitem(request: Request, params: UpdateHtsItemParams, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.parent_id = params.parent_id
htsitem.htsno = params.htsno
htsitem.indent = params.indent
htsitem.description = params.description
htsitem.units = params.units
htsitem.general = params.general
htsitem.special = params.special
htsitem.other = params.other
htsitem.quota_quantity = params.quota_quantity
htsitem.additional_duties = params.additional_duties
htsitem.footnotes = params.footnotes
if class_ := await HtsClass.get_or_none(id=params.class_id, del_flag=1):
htsitem.class_ = class_
if version := await Version.get_or_none(id=params.version_id, del_flag=1):
htsitem.version = version
htsitem.version_id = params.version_id
await htsitem.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="编码项目不存在")
@htsAPI.get("/item/info/{id}", response_class=JSONResponse, response_model=GetHtsItemInfoResponse,
summary="获取编码项信息")
@Log(title="获取编码项信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsitem:btn:info"])
async def get_htsitem_info(request: Request, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
data = {
"id": htsitem.id,
"create_time": htsitem.create_time,
"update_time": htsitem.update_time,
"parent_id": htsitem.parent_id,
"htsno": htsitem.htsno,
"indent": htsitem.indent,
"description": htsitem.description,
"units": htsitem.units,
"general": htsitem.general,
"special": htsitem.special,
"other": htsitem.other,
"quota_quantity": htsitem.quota_quantity,
"additional_duties": htsitem.additional_duties,
"footnotes": htsitem.footnotes,
"class_id": htsitem.class_id,
"version_id": htsitem.version_id,
}
return Response.success(data=data)
else:
return Response.error(msg="编码项目不存在")
@htsAPI.get("/item/list", response_class=JSONResponse, response_model=GetHtsItemListResponse, summary="获取编码项列表")
@Log(title="获取编码项列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsitem:btn:list"])
async def get_htsitem_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
htsno: Optional[str] = Query(default=None, description="编码"),
description: Optional[str] = Query(default=None, description="描述"),
units: Optional[str] = Query(default=None, description="单位列表"),
general: Optional[str] = Query(default=None, description="通用税率"),
special: Optional[str] = Query(default=None, description="特殊税率,适用于特定国家或地区"),
other: Optional[str] = Query(default=None, description="其他税率"),
quota_quantity: Optional[str] = Query(default=None, description="配额数量"),
additional_duties: Optional[str] = Query(default=None, description="附加税"),
footnotes: Optional[str] = Query(default=None, description="脚注列表"),
class_id: Optional[str] = Query(default=None, description="所属类"),
version_id: Optional[str] = Query(default=None, description="所属版本"),
):
filterArgs = {
"htsno__icontains": htsno,
"description__icontains": description,
"units__icontains": units,
"general__icontains": general,
"special__icontains": special,
"other__icontains": other,
"quota_quantity__icontains": quota_quantity,
"additional_duties__icontains": additional_duties,
"footnotes__icontains": footnotes,
"class__id": class_id,
"version_id": version_id,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await HtsItem.filter(**filterArgs, del_flag=1).count()
data = await HtsItem.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
class_id="class__id",
version_id="version_id",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/item/import", response_class=JSONResponse, response_model=BaseResponse, summary="导入编码项")
@Log(title="导入编码项", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsitem:btn:import"])
async def add_htsitem(request: Request, params: ImportHtsItemParams):
version = await Version.get_or_none(id=params.version_id, del_flag=1)
if not version:
return Response.error(msg="版本不存在")
class_ = await HtsClass.get_or_none(id=params.class_id, del_flag=1)
if not class_:
return Response.error(msg="编码类不存在")
file = await File.get_or_none(id=params.file_id, del_flag=1)
if not file:
return Response.error(msg="文件不存在")
if await HtsItem.get_or_none(class_=params.class_id, version=params.version_id):
return Response.error(msg="编码项已存在")
def build_hierarchy_with_ids(data):
hierarchy = []
stack = []
for item in data:
current_level = int(item['indent'])
current_item = {
'id': str(uuid.uuid4()), # 生成唯一的 UUID
'indent': item['indent'],
'htsno': item['htsno'].replace(".", "").strip(),
'description': item['description'],
'parent_id': None, # 默认没有父级,
"units": json.dumps(item["units"], ensure_ascii=False),
"general": item["general"],
"special": item["special"],
"other": item["other"],
"footnotes": json.dumps(item["footnotes"], ensure_ascii=False),
"quota_quantity": item["quotaQuantity"],
"additional_duties": item["additionalDuties"],
"addiitional_duties": item["addiitionalDuties"]
}
# 找到当前项的父级
while stack and int(stack[-1]['indent']) >= current_level:
stack.pop()
if stack:
# 设置当前项的 parent_id 为父级的 id
current_item['parent_id'] = stack[-1]['item']['id']
# 将当前项添加到层级结构中
hierarchy.append(current_item)
# 将当前项压入栈中
stack.append({'indent': current_level, 'item': current_item})
return hierarchy
with open(file.absolute_path, 'r', encoding='utf-8') as r:
data = json.load(r)
for x in data:
x["indent"] = str(int(x["indent"]) + 1)
data.append({
"htsno": str(class_.chapter_name).replace("Chapter", "").strip() if int(
str(class_.chapter_name).replace("Chapter", "").strip(" ")) > 10 else "0" +
str(class_.chapter_name).replace(
"Chapter", "").strip(),
"indent": "0",
"description": class_.class_description,
"units": [],
"general": "",
"special": "",
"other": "",
"footnotes": [],
"quotaQuantity": "",
"additionalDuties": "",
"addiitionalDuties": ""
})
hierarchy = build_hierarchy_with_ids(data)
# 批量插入查询结果
create_tasks = [
HtsItem(
id=item["id"],
parent_id=item["parent_id"],
htsno=item["htsno"],
indent=item["indent"],
description=item["description"],
units=item["units"],
general=item["general"],
special=item["special"],
other=item["other"],
quota_quantity=item["quota_quantity"],
additional_duties=item["additional_duties"],
footnotes=item["footnotes"],
class_=class_,
version=version
)
for item in hierarchy
]
await HtsItem.bulk_create(create_tasks)
return Response.success()

View File

@ -123,7 +123,7 @@ async def get_locale_list(request: Request,
code: Optional[str] = Query(default=None, description="国际化类型代码"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'name': name,
'code': code
}.items() if v
@ -242,7 +242,7 @@ async def get_i18n_list(request: Request,
translation: Optional[str] = Query(default=None, description="国际化内容翻译内容"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'key': key,
'locale_id': locale_id,
'translation': translation

View File

@ -12,7 +12,7 @@ from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAuth
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
@ -41,11 +41,10 @@ async def get_login_log(request: Request,
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id")
online_user_list = await LoginController.get_online_user(request, sub_departments)
online_user_list = list(
filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'username': username,
'nickname': nickname,
}.items() if v
@ -56,10 +55,20 @@ async def get_login_log(request: Request,
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['login_time__range'] = [startTime, endTime]
if not department_id:
filterArgs['user__department__id__in'] = sub_departments
if await hasAuth(request, "login:btn:admin"):
online_user_list = list(
filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
if not department_id:
filterArgs['user__department__id__in'] = sub_departments
else:
filterArgs['user__department__id'] = department_id
else:
filterArgs['user__department__id'] = department_id
online_user_list = list(
filter(lambda x: x["user_id"] == user_id, jsonable_encoder(online_user_list)))
if department_id:
filterArgs['user__department__id'] = department_id
else:
filterArgs["user__department__id"] = current_user.get("department_id")
result = await LoginLog.filter(**filterArgs, user__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(pageSize).values(
id="id",
@ -171,8 +180,9 @@ async def get_operation_log(request: Request,
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id")
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'operation_name': name,
'operation_type': type,
'operator__username': username,
@ -185,10 +195,15 @@ async def get_operation_log(request: Request,
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['operation_time__range'] = [startTime, endTime]
if not department_id:
filterArgs['department__id__in'] = sub_departments
if await hasAuth(request, "operation:btn:admin"):
if not department_id:
filterArgs['department__id__in'] = sub_departments
else:
filterArgs['department__id'] = department_id
else:
filterArgs['department__id'] = department_id
filterArgs['operator__id'] = user_id
if department_id:
filterArgs['department__id'] = department_id
result = await OperationLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(
pageSize).values(

View File

@ -5,6 +5,7 @@
# @File : login.py
# @Software : PyCharm
# @Comment : 本程序
import json
import uuid
from datetime import timedelta, datetime
@ -18,7 +19,7 @@ from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User, Role, UserRole
from models import Department, User, Role, UserRole, LoginLog, OperationLog, QueryCodeLog, QueryCode, HtsClass, HtsItem,Version
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
@ -271,3 +272,143 @@ async def logout(request: Request, status: bool = Depends(LoginController.logout
if status:
return Response.success(data="退出成功!")
return Response.error(data="登出失败!")
@loginAPI.post("/unsubscribe", response_class=JSONResponse, response_model=BaseResponse, summary="用户注销")
@Log(title="用户注销", business_type=BusinessType.FORCE)
async def unsubscribe(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
id = current_user.get("id")
session_id = current_user.get("session_id")
if user := await User.get_or_none(id=id, del_flag=1):
user.del_flag = 0
await user.save()
redis_token = await request.app.state.redis.get(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
if redis_token:
await request.app.state.redis.delete(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除编码查询日志
logs = await QueryCodeLog.filter(operator__id=user.id, del_flag=1).values("id")
for log in logs:
await QueryCode.filter(session_id=log["id"], del_flag=1).update(del_flag=0)
await QueryCodeLog.filter(id=log["id"], del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(data="注销成功!")
else:
return Response.error(data="注销失败!")
# @loginAPI.get("/test")
# async def test(request: Request):
# values = await HtsClass.filter(del_flag=1).values("chapter_name","id")
# version=await Version.get(id="4b2101cc-9bc3-4084-af16-eeb27b24b682")
# for item in values:
# hts = str(item["chapter_name"]).replace("Chapter", "").strip()
# class_=await HtsClass.get(id=item["id"])
# if hts == "77":
# continue
#
# def build_hierarchy_with_ids(data):
# hierarchy = []
# stack = []
#
# for item in data:
# current_level = int(item['indent'])
# current_item = {
# 'id': str(uuid.uuid4()), # 生成唯一的 UUID
# 'indent': item['indent'],
# 'htsno': item['htsno'].replace(".", "").strip(),
# 'description': item['description'],
# 'parent_id': None, # 默认没有父级,
# "units": json.dumps(item["units"], ensure_ascii=False),
# "general": item["general"],
# "special": item["special"],
# "other": item["other"],
# "footnotes": json.dumps(item["footnotes"], ensure_ascii=False),
# "quota_quantity": item["quotaQuantity"],
# "additional_duties": item["additionalDuties"],
# "addiitional_duties": item["addiitionalDuties"]
# }
#
# # 找到当前项的父级
# while stack and int(stack[-1]['indent']) >= current_level:
# stack.pop()
#
# if stack:
# # 设置当前项的 parent_id 为父级的 id
# current_item['parent_id'] = stack[-1]['item']['id']
#
# # 将当前项添加到层级结构中
# hierarchy.append(current_item)
#
# # 将当前项压入栈中
# stack.append({'indent': current_level, 'item': current_item})
#
# return hierarchy
#
# with open(f"E:/PythonCodes/cutoms-system-backend/test/{hts}.json", 'r', encoding='utf-8') as r:
# data = json.load(r)
# for x in data:
# x["indent"] = str(int(x["indent"]) + 1)
# data.append({
# "htsno": str(item["chapter_name"]).replace("Chapter", "").strip() if int(
# str(item["chapter_name"]).replace("Chapter", "").strip(" ")) > 10 else "0" +
# str(item["chapter_name"]).replace(
# "Chapter", "").strip(),
# "indent": "0",
# "description": class_.class_description,
# "units": [],
# "general": "",
# "special": "",
# "other": "",
# "footnotes": [],
# "quotaQuantity": "",
# "additionalDuties": "",
# "addiitionalDuties": ""
# })
# hierarchy = build_hierarchy_with_ids(data)
# # 批量插入查询结果
# create_tasks = [
# HtsItem(
# id=item["id"],
#
# parent_id=item["parent_id"],
#
# htsno=item["htsno"],
#
# indent=item["indent"],
#
# description=item["description"],
#
# units=item["units"],
#
# general=item["general"],
#
# special=item["special"],
#
# other=item["other"],
#
# quota_quantity=item["quota_quantity"],
#
# additional_duties=item["additional_duties"],
#
# footnotes=item["footnotes"],
#
# class_=class_,
#
# version=version
# )
# for item in hierarchy
# ]
# await HtsItem.bulk_create(create_tasks)
# print(class_.chapter_name)

View File

@ -10,11 +10,11 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAdmin
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
from models import Permission
from models import Permission, RolePermission
from schemas.common import BaseResponse
from schemas.permission import AddPermissionParams, GetPermissionInfoResponse, GetPermissionListResponse
from utils.response import Response
@ -51,6 +51,7 @@ async def add_permission(request: Request, params: AddPermissionParams):
leave_transition=params.leave_transition,
fixed_tag=params.fixed_tag,
hidden_tag=params.hidden_tag,
is_admin=params.is_admin
)
if permission:
# 更新用户信息缓存
@ -72,8 +73,8 @@ async def add_permission(request: Request, params: AddPermissionParams):
@Auth(permission_list=["permission:btn:delete"])
async def delete_permission(request: Request, id: str = Path(description="权限ID")):
if permission := await Permission.get_or_none(id=id, del_flag=1):
permission.del_flag = 0
await permission.save()
# 移除角色权限
await delete_permission_recursive(permission_id=permission.id)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
@ -87,6 +88,20 @@ async def delete_permission(request: Request, id: str = Path(description="权限
return Response.error(msg="删除权限失败,权限不存在!")
async def delete_permission_recursive(permission_id: str):
"""
递归删除权限及其附属权限
:param permission_id: 权限ID
:return:
"""
await Permission.filter(id=permission_id, del_flag=1).update(del_flag=0)
await RolePermission.filter(permission_id=permission_id, del_flag=1).update(del_flag=0)
sub_permissions = await Permission.filter(parent_id=permission_id, del_flag=1).all()
for sub_department in sub_permissions:
await delete_permission_recursive(sub_department.id)
return True
@permissionAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@permissionAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@Log(title="更新权限", business_type=BusinessType.UPDATE)
@ -114,6 +129,7 @@ async def update_permission(request: Request, params: AddPermissionParams, id: s
permission.leave_transition = params.leave_transition
permission.fixed_tag = params.fixed_tag
permission.hidden_tag = params.hidden_tag
permission.is_admin = params.is_admin
await permission.save()
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
@ -161,6 +177,7 @@ async def get_permission(request: Request, id: str = Path(description="权限ID"
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(msg="查询权限详情成功!", data=permission)
else:
@ -195,10 +212,12 @@ async def get_permission_list(
enterTransition: Optional[str] = Query(default=None, description="进场动画"),
leaveTransition: Optional[str] = Query(default=None, description="离场动画"),
fixedTag: Optional[bool] = Query(default=None, description="固定标签页"),
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页")
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页"),
isAdmin: Optional[bool] = Query(default=None, description="是否为管理专属页面"),
current_user: dict = Depends(LoginController.get_current_user),
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
"id": id,
"name": name,
"parent_id": parentId,
@ -219,9 +238,13 @@ async def get_permission_list(
"enter_transition": enterTransition,
"leave_transition": leaveTransition,
"fixed_tag": fixedTag,
"hidden_tag": hiddenTag
"hidden_tag": hiddenTag,
"is_admin": isAdmin
}.items() if v
}
department_id = current_user.get("department_id", "")
if not await hasAdmin(request, department_id):
filterArgs["is_admin"] = False
total = await Permission.filter(**filterArgs, del_flag=1).count()
result = await Permission.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).order_by(
'rank').values(
@ -250,7 +273,8 @@ async def get_permission_list(
hidden_tag="hidden_tag",
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent"
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(data={
"total": total,

View File

@ -10,7 +10,7 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAuth, hasAdmin
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
@ -187,15 +187,23 @@ async def get_role_list(
current_user: dict = Depends(LoginController.get_current_user)
):
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
"name": name,
"code": code,
"description": description,
"status": status
}.items() if v
}
if not department_id:
filterArgs["department__id__in"] = current_user.get("sub_departments")
if await hasAuth(request, "role:btn:admin"):
if not department_id:
filterArgs["department__id__in"] = current_user.get("sub_departments")
else:
filterArgs["department__id"] = current_user.get("department_id")
else:
if department_id:
filterArgs["department__id"] = department_id
else:
filterArgs["department__id"] = current_user.get("department_id")
total = await Role.filter(**filterArgs, del_flag=1).count()
data = await Role.filter(**filterArgs, del_flag=1).offset(
(page - 1) * pageSize).limit(
@ -231,6 +239,11 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = filterKeyValues(department_permissions, "id")
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限
rolePermissions = await RolePermission.filter(role_id=id, del_flag=1).values("permission_id")
@ -239,6 +252,8 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
add_list = set(params.permission_ids).difference(set(rolePermissions))
# 循环添加角色权限
for item in add_list:
if item not in department_permissions:
continue
permission = await Permission.get_or_none(id=item, del_flag=1)
if permission:
await RolePermission.create(
@ -290,10 +305,15 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = await filterKeyValues(department_permissions, key="id", convert_type=str)
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限
rolePermissions = await RolePermission.filter(role_id=role.id, del_flag=1).values("permission_id")
rolePermissions = await filterKeyValues(rolePermissions, "permission_id")
rolePermissions = await filterKeyValues(rolePermissions, key="permission_id", convert_type=str)
# 利用集合筛选出角色权限中不存在的权限
delete_list = set(rolePermissions).difference(set(params.permission_ids))
# 利用集合筛选出角色权限中新增的权限
@ -303,6 +323,8 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
await RolePermission.filter(role_id=id, permission_id=item, del_flag=1).update(del_flag=0)
# 循环添加角色权限
for item in add_list:
if item not in department_permissions:
continue
await RolePermission.create(role_id=id, permission_id=item)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')

View File

@ -10,7 +10,7 @@ import os
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request, Form
from fastapi.responses import JSONResponse
from annotation.auth import Auth
@ -29,7 +29,7 @@ from schemas.department import GetDepartmentListResponse
from schemas.file import UploadFileResponse
from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \
AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, \
ResetPasswordParams, GetUserStatisticsResponse
ResetPasswordParams, GetUserStatisticsResponse, UpdateBaseUserInfoParams
from utils.common import filterKeyValues
from utils.password import Password
from utils.response import Response
@ -184,7 +184,7 @@ async def get_user_list(
):
sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
f'{k}__icontains': v for k, v in {
'username': username,
'nickname': nickname,
'phone': phone,
@ -438,6 +438,85 @@ async def reset_user_password(request: Request, params: ResetPasswordParams, id:
return Response.failure(msg="用户不存在!")
@userAPI.put("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@userAPI.post("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@Log(title="更新基础个人信息", business_type=BusinessType.UPDATE)
async def update_base_userinfo(request: Request, params: UpdateBaseUserInfoParams,
current_user: dict = Depends(LoginController.get_current_user)):
user = await User.get_or_none(id=current_user.get("id"), del_flag=1)
if user:
user.nickname = params.name
user.gender = params.gender
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@userAPI.post("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@Log(title="用户更新密码", business_type=BusinessType.UPDATE)
async def update_user_password(request: Request, oldPassword: str = Form(description="用户旧密码"),
newPassword: str = Form(description="用户新密码"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(oldPassword)
if user.password != password:
return Response.error(msg="旧密码错误!")
newPassword = await Password.get_password_hash(newPassword)
user.password = newPassword
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@userAPI.post("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@Log(title="用户更新手机号", business_type=BusinessType.UPDATE)
async def update_user_phone(request: Request, password: str = Form(description="用户密码"),
phone: str = Form(description="用户手机号"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
phoneStatus = await User.filter(phone=phone, del_flag=1).count()
if phoneStatus:
return Response.error(f"更改失败,手机号:{phone}已绑定其他账号!")
user.phone = phone
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@userAPI.post("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@Log(title="用户更新邮箱", business_type=BusinessType.UPDATE)
async def update_user_email(request: Request, password: str = Form(description="用户密码"),
email: str = Form(description="用户邮箱"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
emailStatus = await User.filter(email=email, del_flag=1).count()
if emailStatus:
return Response.error(f"更改失败,邮箱:{email}已绑定其他账号!")
user.email = email
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.get("/statistics", response_model=GetUserStatisticsResponse, response_class=JSONResponse,
summary="获取用户查询统计")
@Log(title="获取用户查询统计", business_type=BusinessType.SELECT)
@ -531,4 +610,4 @@ async def get_user_statistics(request: Request, current_user: dict = Depends(Log
"last_month_count": last_month_count,
"before_14day_count_success": before_14day_count_success,
"before_14day_count_fail": before_14day_count_fail,
})
})

2
app.py
View File

@ -24,6 +24,7 @@ from api.role import roleAPI
from api.server import serverAPI
from api.user import userAPI
from api.code import codeAPI
from api.hts import htsAPI
from config.database import init_db, close_db
from config.env import AppConfig
from config.get_ElasticSearch import ElasticSearch
@ -93,6 +94,7 @@ api_list = [
{'api': i18nAPI, 'tags': ['国际化管理']},
{'api': configApi, 'tags': ['配置管理']},
{'api': codeAPI, 'tags': ['编码管理']},
{'api': htsAPI, 'tags': ['海关税率管理']},
]
for api in api_list:

View File

@ -0,0 +1,13 @@
code,description
01012100,Live purebred breeding horses
01012900,Live horses other than purebred breeding horses
01013000,Live asses
01019030,Mules and hinnies imported for immediate slaughter
01019040,Mules and hinnies not imported for immediate slaughter
01022100,Live purebred breeding cattle
01022920,Cows imported specially for dairy purposes
01022940,Live cattle other than purebred or those imported for dairy purposes
01023100,Live purebred breeding buffalo
01023900,"Live buffalo, other than purebred breeding animals"
01029000,"Live bovine animals, other than cattle and buffalo"
01031000,Live purebred breeding swine
1 code description
2 01012100 Live purebred breeding horses
3 01012900 Live horses other than purebred breeding horses
4 01013000 Live asses
5 01019030 Mules and hinnies imported for immediate slaughter
6 01019040 Mules and hinnies not imported for immediate slaughter
7 01022100 Live purebred breeding cattle
8 01022920 Cows imported specially for dairy purposes
9 01022940 Live cattle other than purebred or those imported for dairy purposes
10 01023100 Live purebred breeding buffalo
11 01023900 Live buffalo, other than purebred breeding animals
12 01029000 Live bovine animals, other than cattle and buffalo
13 01031000 Live purebred breeding swine

Binary file not shown.

View File

@ -0,0 +1,18 @@
text
horses
Live purebred breeding cattle
1 text
2 horses
3 Live purebred breeding cattle

Binary file not shown.

View File

@ -71,68 +71,68 @@ async def configure_tortoise_logging(enable_logging: bool = True, log_level: int
if aiomysql_logger.hasHandlers():
aiomysql_logger.handlers.clear()
if enable_logging:
# 设置日志格式
fmt = logging.Formatter(
fmt="%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 创建控制台处理器(输出到控制台)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(fmt)
# 创建文件处理器(输出到文件)
file_handler = RotatingFileHandler(
filename=log_path_sql,
maxBytes=50 * 1024 * 1024, # 日志文件大小达到 50MB 时轮换
backupCount=5, # 保留 5 个旧日志文件
encoding="utf-8",
)
file_handler.setLevel(log_level)
file_handler.setFormatter(fmt)
# 配置 tortoise 顶级日志记录器
tortoise_logger.setLevel(log_level)
tortoise_logger.addHandler(console_handler) # 添加控制台处理器
tortoise_logger.addHandler(file_handler) # 添加文件处理器
# 配置 aiomysql 日志记录器
aiomysql_logger.setLevel(log_level)
aiomysql_logger.addHandler(console_handler) # 添加控制台处理器
aiomysql_logger.addHandler(file_handler) # 添加文件处理器
# 配置 SQL 查询日志记录器
sql_logger = logging.getLogger("tortoise.db_client")
sql_logger.setLevel(log_level)
class SQLResultLogger(logging.Handler):
async def emit(self, record):
# 只处理 SQL 查询相关的日志
if "SELECT" in record.getMessage() or "INSERT" in record.getMessage() or "UPDATE" in record.getMessage() or "DELETE" in record.getMessage():
# 输出 SQL 查询语句
console_handler.emit(record)
file_handler.emit(record)
# 异步获取并记录查询结果
await self.log_query_result(record)
async def log_query_result(self, record):
"""
执行查询并返回结果
"""
try:
from tortoise import Tortoise
connection = Tortoise.get_connection("default")
result = await connection.execute_query_dict(record.getMessage())
return result
except Exception as e:
return f"获取查询结果失败: {str(e)}"
# 添加自定义 SQL 查询日志处理器
sql_result_handler = SQLResultLogger()
sql_result_handler.setLevel(log_level)
sql_logger.addHandler(sql_result_handler)
else:
# 如果禁用日志,设置日志级别为 WARNING 以抑制大部分输出
tortoise_logger.setLevel(logging.WARNING)
# if enable_logging:
# # 设置日志格式
# fmt = logging.Formatter(
# fmt="%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S",
# )
#
# # 创建控制台处理器(输出到控制台)
# console_handler = logging.StreamHandler(sys.stdout)
# console_handler.setLevel(log_level)
# console_handler.setFormatter(fmt)
#
# # 创建文件处理器(输出到文件)
# file_handler = RotatingFileHandler(
# filename=log_path_sql,
# maxBytes=50 * 1024 * 1024, # 日志文件大小达到 50MB 时轮换
# backupCount=5, # 保留 5 个旧日志文件
# encoding="utf-8",
# )
# file_handler.setLevel(log_level)
# file_handler.setFormatter(fmt)
#
# # 配置 tortoise 顶级日志记录器
# tortoise_logger.setLevel(log_level)
# tortoise_logger.addHandler(console_handler) # 添加控制台处理器
# tortoise_logger.addHandler(file_handler) # 添加文件处理器
#
# # 配置 aiomysql 日志记录器
# aiomysql_logger.setLevel(log_level)
# aiomysql_logger.addHandler(console_handler) # 添加控制台处理器
# aiomysql_logger.addHandler(file_handler) # 添加文件处理器
# # 配置 SQL 查询日志记录器
# sql_logger = logging.getLogger("tortoise.db_client")
# sql_logger.setLevel(log_level)
#
# class SQLResultLogger(logging.Handler):
# async def emit(self, record):
# # 只处理 SQL 查询相关的日志
# if "SELECT" in record.getMessage() or "INSERT" in record.getMessage() or "UPDATE" in record.getMessage() or "DELETE" in record.getMessage():
# # 输出 SQL 查询语句
# console_handler.emit(record)
# file_handler.emit(record)
#
# # 异步获取并记录查询结果
# await self.log_query_result(record)
#
# async def log_query_result(self, record):
# """
# 执行查询并返回结果。
# """
# try:
# from tortoise import Tortoise
# connection = Tortoise.get_connection("default")
# result = await connection.execute_query_dict(record.getMessage())
# return result
# except Exception as e:
# return f"获取查询结果失败: {str(e)}"
#
# # 添加自定义 SQL 查询日志处理器
# sql_result_handler = SQLResultLogger()
# sql_result_handler.setLevel(log_level)
# sql_logger.addHandler(sql_result_handler)
# else:
# # 如果禁用日志,设置日志级别为 WARNING 以抑制大部分输出
# tortoise_logger.setLevel(logging.WARNING)

View File

@ -313,6 +313,7 @@ class UploadSettings:
'doc',
'docx',
'xls',
'csv',
'xlsx',
'ppt',
'pptx',
@ -330,6 +331,7 @@ class UploadSettings:
'rmvb',
# pdf
'pdf',
'json'
]
"""
默认允许上传的文件扩展名列表

View File

@ -132,7 +132,8 @@ class QueryController:
keepAlive="permission__keep_alive",
hiddenTag="permission__hidden_tag",
showLink="permission__show_link",
showParent="permission__show_parent"
showParent="permission__show_parent",
isAdmin="permission__is_admin",
)
permissions.extend(permission)
return permissions

View File

@ -6,7 +6,7 @@
# @Software : PyCharm
# @Comment : 本程序
from models.code import Code, QueryCode, QueryCodeLog
from models.code import Code, QueryCode, QueryCodeLog, CodeFeedback, CodeImport
from models.config import Config
from models.department import Department
from models.file import File
@ -15,6 +15,7 @@ from models.log import LoginLog, OperationLog
from models.permission import Permission
from models.role import Role, RolePermission
from models.user import User, UserRole
from models.hts import Version, HtsClass, HtsItem
__all__ = [
'Department',
@ -30,6 +31,11 @@ __all__ = [
'Locale',
'Config',
'Code',
'CodeFeedback',
'CodeImport',
'QueryCode',
'QueryCodeLog'
'QueryCodeLog',
'HtsClass',
'HtsItem',
'Version',
]

View File

@ -24,6 +24,12 @@ class Code(BaseModel):
description="描述",
source_field="description"
)
user = fields.ForeignKeyField(
"models.User",
related_name="codes",
description="用户",
source_field="user_id"
)
class Meta:
table = "code"
@ -137,6 +143,11 @@ class QueryCode(BaseModel):
"""
查询编码模型
"""
id=fields.UUIDField(
pk=True,
description="主键",
source_field="id"
)
session = fields.ForeignKeyField(
"models.QueryCodeLog",
related_name="query_code",
@ -160,3 +171,70 @@ class QueryCode(BaseModel):
class Meta:
table = "query_code"
table_description = "查询编码表"
class CodeFeedback(BaseModel):
"""
编码反馈模型
"""
code = fields.ForeignKeyField(
"models.Code",
related_name="feedbacks",
description="编码",
null=True,
source_field="code_id"
)
user = fields.ForeignKeyField(
"models.User",
related_name="feedbacks",
description="用户",
source_field="user_id"
)
feedback_code = fields.CharField(
max_length=255,
description="反馈编码",
source_field="feedback_code"
)
feedback_description = fields.TextField(
description="反馈内容",
source_field="feedback_description"
)
status = fields.SmallIntField(
default=3,
description="状态0删除1审核通过2审核不通过3待审核",
source_field="status"
)
class Meta:
table = "code_feedback"
table_description = "编码反馈表"
class CodeImport(BaseModel):
"""
编码导入模型
"""
code = fields.CharField(
max_length=255,
description="编码",
source_field="code"
)
description = fields.TextField(
description="描述",
source_field="description"
)
user = fields.ForeignKeyField(
"models.User",
related_name="imports",
description="用户",
source_field="user_id"
)
status = fields.SmallIntField(
default=1,
description="状态0删除1审核通过2审核不通过3待审核",
source_field="status"
)
class Meta:
table = "code_import"
table_description = "编码导入表"

165
models/hts.py Normal file
View File

@ -0,0 +1,165 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : hts.py
# @Comment : 本程序用于海关编码税率模型
from tortoise import fields
from models.common import BaseModel
class Version(BaseModel):
"""
版本信息模型
"""
version = fields.CharField(max_length=255, description="版本号", source_field="version")
"""
版本号
- 最大长度为 255 个字符
- 映射到数据库字段 version
"""
date = fields.CharField(max_length=255,description="版本日期", source_field="date")
"""
版本日期
- 映射到数据库字段 date
"""
url = fields.CharField(max_length=255, description="下载地址", source_field="url")
"""
下载地址
- 最大长度为 255 个字符
- 映射到数据库字段 url
"""
class Meta:
table = "version"
table_description = "版本信息"
class HtsClass(BaseModel):
"""
编码类信息模型
"""
class_name = fields.TextField(description="类名", source_field="class_name")
"""
类名
- 映射到数据库字段 class_name
"""
class_description = fields.TextField(description="类描述", source_field="class_description")
"""
类描述
- 映射到数据库字段 class_description
"""
chapter_name = fields.TextField(description="章节", source_field="chapter_name")
"""
章节
- 映射到数据库字段 chapter_name
"""
chapter_description = fields.TextField(description="章节描述", source_field="chapter_description")
"""
章节描述
- 映射到数据库字段 chapter_description
"""
class Meta:
table = "hts_class"
table_description = "编码类信息"
class HtsItem(BaseModel):
"""
编码项信息模型
"""
parent_id = fields.CharField(max_length=255, null=True, description="父编码", source_field="parent_id")
"""
父编码
- 最大长度为 255 个字符
- 映射到数据库字段 parent_id
- 可为空
"""
htsno = fields.CharField(max_length=255, description="编码", source_field="htsno")
"""
编码
- 最大长度为 255 个字符
- 映射到数据库字段 htsno
"""
indent = fields.IntField(description="层级",null=True, source_field="indent")
"""
缩进
- 映射到数据库字段 indent
"""
description = fields.TextField(description="描述",null=True, source_field="description")
"""
描述
- 映射到数据库字段 description
"""
units = fields.JSONField(description="单位列表", null=True, source_field="units")
"""
单位列表
- 映射到数据库字段 units
"""
general = fields.TextField(description="通用税率",null=True, source_field="general")
"""
通用税率
- 最大长度为 50 个字符
- 映射到数据库字段 general
"""
special = fields.TextField(description="特殊税率,适用于特定国家或地区",null=True, source_field="special")
"""
特殊税率适用于特定国家或地区
- 最大长度为 255 个字符
- 映射到数据库字段 special
"""
other = fields.TextField(description="其他税率",null=True, source_field="other")
"""
其他税率
- 最大长度为 50 个字符
- 映射到数据库字段 other
"""
quota_quantity = fields.TextField(description="配额数量", null=True, source_field="quota_quantity")
"""
配额数量
- 最大长度为 50 个字符
- 映射到数据库字段 quota_quantity
"""
additional_duties = fields.TextField(description="附加税",null=True, source_field="additional_duties")
"""
附加税
- 最大长度为 50 个字符
- 映射到数据库字段 additional_duties
"""
footnotes = fields.JSONField(null=True, description="脚注列表", source_field="footnotes")
"""
脚注列表
- 映射到数据库字段 footnotes
- 可为空
"""
class_ = fields.ForeignKeyField("models.HtsClass", related_name="class_items", on_delete=fields.CASCADE,
description="所属类", source_field="class_id")
version = fields.ForeignKeyField("models.Version", related_name="version_items", on_delete=fields.CASCADE,
description="所属版本", source_field="version_id")
class Meta:
table = "hts_item"
table_description = "编码项信息"

View File

@ -280,6 +280,18 @@ class Permission(BaseModel):
- 映射到数据库字段 show_parent
"""
is_admin = fields.BooleanField(
default=False,
description="是否为管理专属页面",
source_field="is_admin" # 映射到数据库字段 is_admin
)
"""
是否为管理专属页面
- 是否为管理专属页面仅管理员可见
- 默认为 False
- 映射到数据库字段 is_admin
"""
class Meta:
table = "permission" # 数据库表名
table_description = "权限表" # 表描述

View File

@ -28,6 +28,11 @@ class CodeInfo(BaseModel):
update_time: Optional[datetime] = Field(default=None, description="更新时间")
code: str = Field(..., description="编码")
description: str = Field(..., description="描述")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
class AddCodeParams(BaseModel):
@ -176,3 +181,112 @@ class GetCodeLogAllResponse(BaseResponse):
查询所有编码日志响应
"""
data: GetCodeLogAllResult = Field(..., description="查询编码日志结果")
class CodeFeedbackInfo(BaseModel):
"""
编码反馈信息
"""
model_config = ConfigDict(
alias_generator=to_camel
)
id: str = Field(default="", description="主键")
code_id: str = Field(default="", description="编码ID")
code: str = Field(default="", description="编码")
description: str = Field(default="", description="描述")
feedback_code: str = Field(default="", description="反馈编码")
feedback_description: str = Field(default="", description="反馈文本")
status: int = Field(default=3, description="反馈状态")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetCodeFeedbackResponse(BaseResponse):
"""
获取编码结果
"""
data: CodeFeedbackInfo = Field(default={}, description="编码反馈结果")
class AddCodeFeedbackParams(BaseModel):
"""
添加编码反馈参数
"""
code_id: str = Field(..., description="编码ID")
feedback_code: str = Field(..., description="反馈编码")
feedback_description: str = Field(..., description="反馈文本")
class CodeFeedbackResult(ListQueryResult):
"""
编码反馈结果
"""
result: List[CodeFeedbackInfo] = Field(..., description="编码反馈列表")
class GetCodeFeedbackListResponse(BaseResponse):
"""
获取编码反馈响应
"""
data: CodeFeedbackResult = Field(..., description="编码反馈结果")
class UpdateCodeFeedbackStatusParams(BaseModel):
"""
更新编码反馈参数
"""
status: int = Field(..., description="状态")
ids: List[str] = Field(default=[], description="编码反馈ID")
class CodeImportInfo(BaseModel):
"""
编码导入信息
"""
model_config = ConfigDict(
alias_generator=to_camel
)
code: str = Field(default="", description="编码")
description: str = Field(default="", description="描述")
status: int = Field(default=3, description="反馈状态")
user_id: str = Field(default="", description="用户ID")
username: str = Field(default="", description="用户名称")
nickname: str = Field(default="", description="用户昵称")
department_id: str = Field(default="", description="用户部门ID")
department_name: str = Field(default="", description="用户部门名称")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetCodeImportResponse(BaseResponse):
"""
获取编码导入响应
"""
data: CodeImportInfo = Field(default={}, description="编码导入结果")
class CodeImportResult(ListQueryResult):
"""
编码导入结果
"""
result: List[CodeImportInfo] = Field(..., description="编码导入列表")
class GetCodeImportListResponse(BaseResponse):
"""
获取编码导入响应
"""
data: CodeImportResult = Field(..., description="编码导入结果")
class UpdateCodeImportStatusParams(BaseModel):
"""
更新编码反馈参数
"""
status: int = Field(..., description="状态")
ids: List[str] = Field(default=[], description="编码反馈ID")

391
schemas/hts.py Normal file
View File

@ -0,0 +1,391 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : version.py
# @Comment : 本程序用于生成版本信息参数和响应模型
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_snake
from schemas.common import BaseResponse, ListQueryResult
class VersionInfo(BaseModel):
"""税率编码版本信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class AddVersionParams(BaseModel):
"""新增税率编码版本参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class UpdateVersionParams(BaseModel):
"""更新税率编码版本参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class GetVersionInfoResponse(BaseResponse):
"""获取税率编码版本信息响应"""
data: VersionInfo = Field(None, title="版本信息信息")
class GetVersionInfoListResult(ListQueryResult):
"""获取税率编码版本信息列表响应结果"""
result: List[VersionInfo] = Field(None, title="版本信息信息列表")
class GetVersionListResponse(BaseResponse):
"""获取税率编码版本信息列表响应"""
data: GetVersionInfoListResult = Field(None, title="版本信息信息列表")
class HTSClassInfo(BaseModel):
"""编码类别信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class AddHTSClassParams(BaseModel):
"""新增编码类别参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class UpdateHTSClassParams(BaseModel):
"""更新编码类别参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class GetHTSClassInfoResponse(BaseResponse):
"""获取编码类别信息响应"""
data: HTSClassInfo = Field(None, title="编码类信息信息")
class GetHTSClassInfoListResult(ListQueryResult):
"""获取编码类别信息列表响应结果"""
result: List[HTSClassInfo] = Field(None, title="编码类信息信息列表")
class GetHTSClassListResponse(BaseResponse):
"""获取编码类别信息列表响应"""
data: GetHTSClassInfoListResult = Field(None, title="编码类信息信息列表")
class HtsItemInfo(BaseModel):
"""编码项目信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class AddHtsItemParams(BaseModel):
"""新增编码项目参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class UpdateHtsItemParams(BaseModel):
"""更新编码项目参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class ImportHtsItemParams(BaseModel):
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
file_id: str = Field(
title="文件ID"
)
version_id: str = Field(
title="版本ID"
)
class_id: str = Field(
title="类ID"
)
class GetHtsItemInfoResponse(BaseResponse):
"""获取编码项目信息响应"""
data: HtsItemInfo = Field(None, title="编码项信息信息")
class GetHtsItemInfoListResult(ListQueryResult):
"""获取编码项目信息列表响应结果"""
result: List[HtsItemInfo] = Field(None, title="编码项信息信息列表")
class GetHtsItemListResponse(BaseResponse):
"""获取编码项目信息列表响应"""
data: GetHtsItemInfoListResult = Field(None, title="编码项信息信息列表")

View File

@ -43,6 +43,7 @@ class PermissionInfo(BaseModel):
fixed_tag: bool = Field(default=False, description="固定标签页")
show_link: bool = Field(default=True, description="显示菜单")
show_parent: bool = Field(default=True, description="显示父级菜单")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config:
json_schema_extra = {
@ -72,7 +73,8 @@ class PermissionInfo(BaseModel):
"hidden_tag": False,
"fixed_tag": False,
"show_link": True,
"show_parent": True
"show_parent": True,
"is_admin": False
}
}
@ -109,6 +111,7 @@ class AddPermissionParams(BaseModel):
show_parent: bool = Field(default=True, description="显示父级菜单")
parent_id: str = Field(default="", max_length=36, description="父级菜单ID")
menu_type: int = Field(default=0, description="菜单类型")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config:
json_schema_extra = {
@ -133,7 +136,8 @@ class AddPermissionParams(BaseModel):
"show_link": True,
"show_parent": True,
"parent_id": "",
"menu_type": 0
"menu_type": 0,
"is_admin": False
}
}

View File

@ -387,3 +387,19 @@ class GetUserStatisticsResponse(BaseResponse):
获取用户统计信息响应模型
"""
data: GetUserStatisticsResult = Field(default=None, description="响应数据")
class UpdateBaseUserInfoParams(BaseModel):
"""修改基础信息参数"""
name: str
"""姓名"""
gender: int
"""性别"""
class Config:
json_schema_extra = {
"example": {
"name": "张三",
"gender": 1,
}
}

View File

@ -5,6 +5,8 @@
# @File : common.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List, Any, Optional, Type
def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See:
@ -26,11 +28,14 @@ def bytes2human(n, format_str='%(value).1f%(symbol)s'):
return format_str % dict(symbol=symbols[0], value=n)
async def filterKeyValues(dataList: list, key: str) -> list:
async def filterKeyValues(dataList: List[dict], key: str, default: Any = None, convert_type: Optional[Type] = None) -> List[Any]:
"""
获取列表字段数据
:param dataList: 数据列表
:param key: 关键字
:return:
获取列表字段数据并可选择进行类型转换
:param dataList: 数据列表列表中的元素是字典
:param key: 要提取的字段
:param default: 如果字段不存在返回的默认值
:param convert_type: 需要转换的类型 intstrfloat 默认为 None 不转换
:return: 提取并转换后的值列表
"""
return [item[key] for item in dataList]
return [convert_type(item.get(key, default)) if convert_type else item.get(key, default) for item in dataList]