feat(models): 添加海关税率管理相关模型和接口

- 在 models 中新增 Version、HtsClass 和 HtsItem 三个模型
- 在 api 中新增 hts 路由和相关接口
- 更新 app.py 和 code.py 以支持海关税率管理功能
This commit is contained in:
皓月归尘 2025-03-31 04:25:09 +08:00
parent d921a2e7ba
commit 92a57fa5d0
9 changed files with 1698 additions and 3 deletions

171
.env.dev Normal file
View File

@ -0,0 +1,171 @@
# -------- 应用配置 --------
# 应用运行环境
APP_ENV = 'dev'
# 应用名称
APP_NAME = 'Cutoms-System'
# 应用代理路径
APP_ROOT_PATH = ''
# 应用主机
APP_HOST = '0.0.0.0'
# 应用端口
APP_PORT = 8082
# 应用版本
APP_VERSION= '1.0.0'
# 应用是否开启热重载
APP_RELOAD = true
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = false
# 应用是否允许账号同时登录
APP_SAME_TIME_LOGIN = true
# -------JWT配置------------
# JWT 签名密钥
JWT_SECRET_KEY=b01c66dc2c58dc6a0aabfe2144256be36226de378bf87f72c0c795dda67f4d55
# JWT 签名算法
JWT_ALGORITHM=HS256
# JWT 盐值
JWT_SALT=jwt_salt
# JWT 令牌有效期(分钟)
JWT_EXPIRE_MINUTES=1440
# JWT 令牌在 Redis 中的缓存有效期(分钟)
JWT_REDIS_EXPIRE_MINUTES=30
# -------- 数据库配置 --------
# 数据库类型,默认为'mysql'
DB_TYPE = 'mysql'
# 数据库主机
DB_HOST = '127.0.0.1'
# 数据库端口
DB_PORT = 3306
# 数据库用户名
DB_USERNAME = 'root'
# 数据库密码
DB_PASSWORD = 'mysqlroot'
# 数据库名称
DB_DATABASE = 'cutoms_system'
# 是否开启日志
DB_ECHO = true
# 数据库日志级别,默认为 10DEBUG
DB_LOG_LEVEL = 10
# 允许溢出连接池大小的最大连接数
DB_MAX_OVERFLOW = 10
# 连接池大小0表示连接数无限制
DB_POOL_SIZE = 50
# 连接回收时间(单位:秒)
DB_POOL_RECYCLE = 3600
# 连接池中没有线程可用时,最多等待的时间(单位:秒)
DB_POOL_TIMEOUT = 30
# -------- Redis配置 --------
# Redis主机
REDIS_HOST = '127.0.0.1'
# Redis端口
REDIS_PORT = 6379
# Redis用户名
REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
REDIS_DATABASE = 2
# ======================
# 上传配置
# ======================
# 文件上传的 URL 前缀,默认为 '/profile'。
# 例如:`/profile/example.jpg`。
UPLOAD_PREFIX=/profile
# 文件上传的存储路径,默认为 'data/upload_path'。
# 上传的文件将存储在此目录中,如果目录不存在,会自动创建。
UPLOAD_PATH=data/upload_path
# 上传机器的标识,默认为 'A'。
# 用于区分不同的上传机器或节点,在多机部署时可以使用此字段。
UPLOAD_MACHINE=A
# 默认允许上传的文件扩展名列表,使用逗号分隔。
# 包含常见的图片、文档、压缩文件、视频和 PDF 格式。
# 可以根据需求扩展或修改此列表。
DEFAULT_ALLOWED_EXTENSION=bmp,gif,jpg,jpeg,png,doc,docx,xls,xlsx,ppt,pptx,html,htm,txt,rar,zip,gz,bz2,mp4,avi,rmvb,pdf
# 文件下载的存储路径,默认为 'data/download_path'。
# 下载的文件将存储在此目录中,如果目录不存在,会自动创建。
DOWNLOAD_PATH=data/download_path
# ======================
# 邮件配置
# ======================
# 邮件发送者的用户名,默认为空。
EMAIL_USERNAME=
# 邮件发送者的密码,默认为空。
EMAIL_PASSWORD=
# 邮件服务器地址,默认为 "smtp.qq.com"。
# 如果是其他邮件服务商,请修改为对应的 SMTP 服务器地址。
EMIAL_HOST=smtp.qq.com
# 邮件服务器端口,默认为 465。
# 如果是其他邮件服务商,请根据其要求修改端口号。
EMAIL_PORT=587
# ======================
# 地图配置
# ======================
# 百度地图的 AK 密钥,用于获取地图数据。
# 请在百度地图官网申请获取 AK 密钥。
AK=
# 百度地图sk密钥用于获取地图数据。
# 请在百度地图官网申请获取 SK 密钥。
SK=
# ======================
# ElasticSearch配置
# ======================
# ElasticSearch的URL默认为 "http://localhost:9200"。
ES_HOST=localhost
# ElasticSearch的端口默认为 9200。
ES_PORT=9200
# ElasticSearch的索引名称默认为 "cutoms_system"。
ES_INDEX=product_codes
# ElasticSearch的用户名。
ES_USER=elastic
# ElasticSearch的密码。
ES_PASSWORD=changeme

View File

@ -25,7 +25,7 @@ from config.constant import BusinessType
from config.env import ElasticSearchConfig
from controller.login import LoginController
from exceptions.exception import ServiceException, PermissionException
from models import File, Code, QueryCode, QueryCodeLog, CodeFeedback, CodeImport
from models import File, Code, QueryCode, QueryCodeLog, CodeFeedback, CodeImport, HtsItem
from schemas.code import GetCodeInfoResponse, GetCodeListResponse, GetQueryCodeParams, QueryCodeResponse, AddCodeParams, \
GetQueryCodeLogResponse, GetQueryCodeLogDetailResponse, \
GetCodeLogAllResponse, AddCodeFeedbackParams, GetCodeFeedbackResponse, GetCodeFeedbackListResponse, \
@ -1308,3 +1308,115 @@ async def code_import_audit_all(request: Request, current_user: dict = Depends(L
offset += BATCH_SIZE # 更新查询游标,继续查询下一批数据
return Response.success()
@codeAPI.get("/hts", response_class=JSONResponse, response_model=BaseResponse, summary="查询HTS")
async def query_hts(request: Request, code=Query(description="编码"),
current_user: dict = Depends(LoginController.get_current_user)):
def normalize_numeric_code(code_str):
"""
处理编码字符串
1. 只保留数字字符
2. 截取前10位不足补0
:param code_str: 原始编码字符串
:return: 处理后的10位纯数字编码
"""
# 1. 只保留数字
digits_only = re.sub(r'[^0-9]', '', code_str)
# 2. 锁定10位长度右侧补0
normalized = digits_only.ljust(10, '0')[:10]
return normalized
code = normalize_numeric_code(code)
oneResult = await HtsItem.get_or_none(htsno=code[:2]).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
)
secondResult = await HtsItem.get_or_none(htsno=code[:4]).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
)
threeResult = await HtsItem.get_or_none(htsno=code[:6]).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
)
fourResult = await HtsItem.get_or_none(htsno=code[:8]).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
)
fiveResult = await HtsItem.get_or_none(htsno=code).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
)
return Response.success(
data={
"oneResult": oneResult,
"secondResult": secondResult,
"threeResult": threeResult,
"fourResult": fourResult,
"fiveResult": fiveResult,
}
)

741
api/hts.py Normal file
View File

@ -0,0 +1,741 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : version.py
# @Comment : 本程序用于生成版本信息增删改查接口
import json
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from models import Version, HtsClass, HtsItem, File
from schemas.common import BaseResponse, DeleteListParams
from schemas.hts import AddHTSClassParams, UpdateHTSClassParams, GetHTSClassInfoResponse, GetHTSClassListResponse, \
ImportHtsItemParams
from schemas.hts import AddHtsItemParams, UpdateHtsItemParams, GetHtsItemInfoResponse, GetHtsItemListResponse
from schemas.hts import AddVersionParams, UpdateVersionParams, GetVersionInfoResponse, GetVersionListResponse
from utils.response import Response
htsAPI = APIRouter(
prefix="/hts",
dependencies=[Depends(LoginController.get_current_user)],
)
@htsAPI.post("/version/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增税率编码版本")
@Log(title="新增税率编码版本", business_type=BusinessType.INSERT)
@Auth(permission_list=["version:btn:add"])
async def add_version(request: Request, params: AddVersionParams):
if await Version.get_or_none(
version=params.version,
date=params.date,
url=params.url,
del_flag=1
):
return Response.error(msg="税率编码版本已存在!")
version = await Version.create(
version=params.version,
date=params.date,
url=params.url,
)
if version:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/version/delete/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除税率编码版本")
@htsAPI.post("/version/delete/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="删除税率编码版本")
@Log(title="删除税率编码版本", business_type=BusinessType.DELETE)
@Auth(permission_list=["version:btn:delete"])
async def delete_version(request: Request, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
version.del_flag = 0
await version.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="税率编码版本不存在!")
@htsAPI.delete("/version/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除税率编码版本")
@htsAPI.post("/version/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除税率编码版本")
@Log(title="批量删除税率编码版本", business_type=BusinessType.DELETE)
@Auth(permission_list=["version:btn:delete"])
async def delete_version_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if version := await Version.get_or_none(id=id, del_flag=1):
version.del_flag = 0
await version.save()
return Response.success(msg="删除成功")
@htsAPI.put("/version/update/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="修改税率编码版本")
@htsAPI.post("/version/update/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="修改税率编码版本")
@Log(title="修改税率编码版本", business_type=BusinessType.UPDATE)
@Auth(permission_list=["version:btn:update"])
async def update_version(request: Request, params: UpdateVersionParams, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
print(params)
version.version = params.version
version.date = params.date
version.url = params.url
await version.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="税率编码版本不存在")
@htsAPI.get("/version/info/{id}", response_class=JSONResponse, response_model=GetVersionInfoResponse,
summary="获取税率编码版本信息")
@Log(title="获取税率编码版本信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["version:btn:info"])
async def get_version_info(request: Request, id: str = Path(description="税率编码版本ID")):
if version := await Version.get_or_none(id=id, del_flag=1):
data = {
"id": version.id,
"create_time": version.create_time,
"update_time": version.update_time,
"version": version.version,
"date": version.date,
"url": version.url,
}
return Response.success(data=data)
else:
return Response.error(msg="税率编码版本不存在")
@htsAPI.get("/version/list", response_class=JSONResponse, response_model=GetVersionListResponse,
summary="获取税率编码版本列表")
@Log(title="获取税率编码版本列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["version:btn:list"])
async def get_version_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
version: Optional[str] = Query(default=None, description="版本号"),
date: Optional[str] = Query(default=None, description="版本日期"),
url: Optional[str] = Query(default=None, description="下载地址"),
):
filterArgs = {
"version__icontains": version,
"date__icontains": date,
"url__icontains": url,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await Version.filter(**filterArgs, del_flag=1).count()
data = await Version.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
version="version",
date="date",
url="url",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/class/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增编码类别")
@Log(title="新增编码类别", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsclass:btn:add"])
async def add_htsclass(request: Request, params: AddHTSClassParams):
if await HtsClass.get_or_none(
class_name=params.class_name,
class_description=params.class_description,
chapter_name=params.chapter_name,
chapter_description=params.chapter_description,
del_flag=1
):
return Response.error(msg="编码类别已存在!")
htsclass = await HtsClass.create(
class_name=params.class_name,
class_description=params.class_description,
chapter_name=params.chapter_name,
chapter_description=params.chapter_description,
)
if htsclass:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/class/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码类别")
@htsAPI.post("/class/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码类别")
@Log(title="删除编码类别", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsclass:btn:delete"])
async def delete_htsclass(request: Request, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.del_flag = 0
await htsclass.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="编码类别不存在!")
@htsAPI.delete("/class/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除编码类别")
@htsAPI.post("/class/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码类别")
@Log(title="批量删除编码类别", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsclass:btn:delete"])
async def delete_htsclass_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.del_flag = 0
await htsclass.save()
return Response.success(msg="删除成功")
@htsAPI.put("/class/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码类别")
@htsAPI.post("/class/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码类别")
@Log(title="修改编码类别", business_type=BusinessType.UPDATE)
@Auth(permission_list=["htsclass:btn:update"])
async def update_htsclass(request: Request, params: UpdateHTSClassParams, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
htsclass.class_name = params.class_name
htsclass.class_description = params.class_description
htsclass.chapter_name = params.chapter_name
htsclass.chapter_description = params.chapter_description
await htsclass.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="编码类别不存在")
@htsAPI.get("/class/info/{id}", response_class=JSONResponse, response_model=GetHTSClassInfoResponse,
summary="获取编码类别信息")
@Log(title="获取编码类别信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsclass:btn:info"])
async def get_htsclass_info(request: Request, id: str = Path(description="编码类别ID")):
if htsclass := await HtsClass.get_or_none(id=id, del_flag=1):
data = {
"id": htsclass.id,
"create_time": htsclass.create_time,
"update_time": htsclass.update_time,
"class_name": htsclass.class_name,
"class_description": htsclass.class_description,
"chapter_name": htsclass.chapter_name,
"chapter_description": htsclass.chapter_description,
}
return Response.success(data=data)
else:
return Response.error(msg="编码类别不存在")
@htsAPI.get("/class/list", response_class=JSONResponse, response_model=GetHTSClassListResponse,
summary="获取编码类别列表")
@Log(title="获取编码类别列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsclass:btn:list"])
async def get_htsclass_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
class_name: Optional[str] = Query(default=None, description="类名"),
class_description: Optional[str] = Query(default=None, description="类描述"),
chapter_name: Optional[str] = Query(default=None, description="章节"),
chapter_description: Optional[str] = Query(default=None, description="章节描述"),
):
filterArgs = {
"class_name__icontains": class_name,
"class_description__icontains": class_description,
"chapter_name__icontains": chapter_name,
"chapter_description__icontains": chapter_description,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await HtsClass.filter(**filterArgs, del_flag=1).count()
data = await HtsClass.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
class_name="class_name",
class_description="class_description",
chapter_name="chapter_name",
chapter_description="chapter_description",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/item/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增编码项")
@Log(title="新增编码项", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsitem:btn:add"])
async def add_htsitem(request: Request, params: AddHtsItemParams):
if await HtsItem.get_or_none(
parent_id=params.parent_id,
htsno=params.htsno,
indent=params.indent,
description=params.description,
units=params.units,
general=params.general,
special=params.special,
other=params.other,
quota_quantity=params.quota_quantity,
additional_duties=params.additional_duties,
footnotes=params.footnotes,
class_id=params.class_id,
version_id=params.version_id,
del_flag=1
):
return Response.error(msg="编码项目已存在!")
htsitem = await HtsItem.create(
parent_id=params.parent_id,
htsno=params.htsno,
indent=params.indent,
description=params.description,
units=params.units,
general=params.general,
special=params.special,
other=params.other,
quota_quantity=params.quota_quantity,
additional_duties=params.additional_duties,
footnotes=params.footnotes,
class_id=params.class_id,
version_id=params.version_id,
)
if htsitem:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@htsAPI.delete("/item/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码项")
@htsAPI.post("/item/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除编码项")
@Log(title="删除编码项", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsitem:btn:delete"])
async def delete_htsitem(request: Request, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.del_flag = 0
await htsitem.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="编码项目不存在!")
@htsAPI.delete("/item/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码项")
@htsAPI.post("/item/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除编码项")
@Log(title="批量删除编码项", business_type=BusinessType.DELETE)
@Auth(permission_list=["htsitem:btn:delete"])
async def delete_htsitem_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.del_flag = 0
await htsitem.save()
return Response.success(msg="删除成功")
@htsAPI.put("/item/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码项")
@htsAPI.post("/item/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改编码项")
@Log(title="修改编码项", business_type=BusinessType.UPDATE)
@Auth(permission_list=["htsitem:btn:update"])
async def update_htsitem(request: Request, params: UpdateHtsItemParams, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
htsitem.parent_id = params.parent_id
htsitem.htsno = params.htsno
htsitem.indent = params.indent
htsitem.description = params.description
htsitem.units = params.units
htsitem.general = params.general
htsitem.special = params.special
htsitem.other = params.other
htsitem.quota_quantity = params.quota_quantity
htsitem.additional_duties = params.additional_duties
htsitem.footnotes = params.footnotes
if class_ := await HtsClass.get_or_none(id=params.class_id, del_flag=1):
htsitem.class_ = class_
if version := await Version.get_or_none(id=params.version_id, del_flag=1):
htsitem.version = version
htsitem.version_id = params.version_id
await htsitem.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="编码项目不存在")
@htsAPI.get("/item/info/{id}", response_class=JSONResponse, response_model=GetHtsItemInfoResponse,
summary="获取编码项信息")
@Log(title="获取编码项信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsitem:btn:info"])
async def get_htsitem_info(request: Request, id: str = Path(description="编码项目ID")):
if htsitem := await HtsItem.get_or_none(id=id, del_flag=1):
data = {
"id": htsitem.id,
"create_time": htsitem.create_time,
"update_time": htsitem.update_time,
"parent_id": htsitem.parent_id,
"htsno": htsitem.htsno,
"indent": htsitem.indent,
"description": htsitem.description,
"units": htsitem.units,
"general": htsitem.general,
"special": htsitem.special,
"other": htsitem.other,
"quota_quantity": htsitem.quota_quantity,
"additional_duties": htsitem.additional_duties,
"footnotes": htsitem.footnotes,
"class_id": htsitem.class_id,
"version_id": htsitem.version_id,
}
return Response.success(data=data)
else:
return Response.error(msg="编码项目不存在")
@htsAPI.get("/item/list", response_class=JSONResponse, response_model=GetHtsItemListResponse, summary="获取编码项列表")
@Log(title="获取编码项列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["htsitem:btn:list"])
async def get_htsitem_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
htsno: Optional[str] = Query(default=None, description="编码"),
description: Optional[str] = Query(default=None, description="描述"),
units: Optional[str] = Query(default=None, description="单位列表"),
general: Optional[str] = Query(default=None, description="通用税率"),
special: Optional[str] = Query(default=None, description="特殊税率,适用于特定国家或地区"),
other: Optional[str] = Query(default=None, description="其他税率"),
quota_quantity: Optional[str] = Query(default=None, description="配额数量"),
additional_duties: Optional[str] = Query(default=None, description="附加税"),
footnotes: Optional[str] = Query(default=None, description="脚注列表"),
class_id: Optional[str] = Query(default=None, description="所属类"),
version_id: Optional[str] = Query(default=None, description="所属版本"),
):
filterArgs = {
"htsno__icontains": htsno,
"description__icontains": description,
"units__icontains": units,
"general__icontains": general,
"special__icontains": special,
"other__icontains": other,
"quota_quantity__icontains": quota_quantity,
"additional_duties__icontains": additional_duties,
"footnotes__icontains": footnotes,
"class__id": class_id,
"version_id": version_id,
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await HtsItem.filter(**filterArgs, del_flag=1).count()
data = await HtsItem.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
id="id",
create_time="create_time",
update_time="update_time",
parent_id="parent_id",
htsno="htsno",
indent="indent",
description="description",
units="units",
general="general",
special="special",
other="other",
quota_quantity="quota_quantity",
additional_duties="additional_duties",
footnotes="footnotes",
class_id="class__id",
version_id="version_id",
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})
@htsAPI.post("/item/import", response_class=JSONResponse, response_model=BaseResponse, summary="导入编码项")
@Log(title="导入编码项", business_type=BusinessType.INSERT)
@Auth(permission_list=["htsitem:btn:import"])
async def add_htsitem(request: Request, params: ImportHtsItemParams):
version = await Version.get_or_none(id=params.version_id, del_flag=1)
if not version:
return Response.error(msg="版本不存在")
class_ = await HtsClass.get_or_none(id=params.class_id, del_flag=1)
if not class_:
return Response.error(msg="编码类不存在")
file = await File.get_or_none(id=params.file_id, del_flag=1)
if not file:
return Response.error(msg="文件不存在")
if await HtsItem.get_or_none(class_=params.class_id, version=params.version_id):
return Response.error(msg="编码项已存在")
def build_hierarchy_with_ids(data):
hierarchy = []
stack = []
for item in data:
current_level = int(item['indent'])
current_item = {
'id': str(uuid.uuid4()), # 生成唯一的 UUID
'indent': item['indent'],
'htsno': item['htsno'].replace(".", "").strip(),
'description': item['description'],
'parent_id': None, # 默认没有父级,
"units": json.dumps(item["units"], ensure_ascii=False),
"general": item["general"],
"special": item["special"],
"other": item["other"],
"footnotes": json.dumps(item["footnotes"], ensure_ascii=False),
"quota_quantity": item["quotaQuantity"],
"additional_duties": item["additionalDuties"],
"addiitional_duties": item["addiitionalDuties"]
}
# 找到当前项的父级
while stack and int(stack[-1]['indent']) >= current_level:
stack.pop()
if stack:
# 设置当前项的 parent_id 为父级的 id
current_item['parent_id'] = stack[-1]['item']['id']
# 将当前项添加到层级结构中
hierarchy.append(current_item)
# 将当前项压入栈中
stack.append({'indent': current_level, 'item': current_item})
return hierarchy
with open(file.absolute_path, 'r', encoding='utf-8') as r:
data = json.load(r)
for x in data:
x["indent"] = str(int(x["indent"]) + 1)
data.append({
"htsno": str(class_.chapter_name).replace("Chapter", "").strip() if int(
str(class_.chapter_name).replace("Chapter", "").strip(" ")) > 10 else "0" +
str(class_.chapter_name).replace(
"Chapter", "").strip(),
"indent": "0",
"description": class_.class_description,
"units": [],
"general": "",
"special": "",
"other": "",
"footnotes": [],
"quotaQuantity": "",
"additionalDuties": "",
"addiitionalDuties": ""
})
hierarchy = build_hierarchy_with_ids(data)
# 批量插入查询结果
create_tasks = [
HtsItem(
id=item["id"],
parent_id=item["parent_id"],
htsno=item["htsno"],
indent=item["indent"],
description=item["description"],
units=item["units"],
general=item["general"],
special=item["special"],
other=item["other"],
quota_quantity=item["quota_quantity"],
additional_duties=item["additional_duties"],
footnotes=item["footnotes"],
class_=class_,
version=version
)
for item in hierarchy
]
await HtsItem.bulk_create(create_tasks)
return Response.success()

View File

@ -5,6 +5,7 @@
# @File : login.py
# @Software : PyCharm
# @Comment : 本程序
import json
import uuid
from datetime import timedelta, datetime
@ -18,7 +19,7 @@ from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User, Role, UserRole, LoginLog, OperationLog, QueryCodeLog, QueryCode
from models import Department, User, Role, UserRole, LoginLog, OperationLog, QueryCodeLog, QueryCode, HtsClass, HtsItem,Version
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
@ -304,3 +305,110 @@ async def unsubscribe(request: Request, current_user: dict = Depends(LoginContro
return Response.success(data="注销成功!")
else:
return Response.error(data="注销失败!")
# @loginAPI.get("/test")
# async def test(request: Request):
# values = await HtsClass.filter(del_flag=1).values("chapter_name","id")
# version=await Version.get(id="4b2101cc-9bc3-4084-af16-eeb27b24b682")
# for item in values:
# hts = str(item["chapter_name"]).replace("Chapter", "").strip()
# class_=await HtsClass.get(id=item["id"])
# if hts == "77":
# continue
#
# def build_hierarchy_with_ids(data):
# hierarchy = []
# stack = []
#
# for item in data:
# current_level = int(item['indent'])
# current_item = {
# 'id': str(uuid.uuid4()), # 生成唯一的 UUID
# 'indent': item['indent'],
# 'htsno': item['htsno'].replace(".", "").strip(),
# 'description': item['description'],
# 'parent_id': None, # 默认没有父级,
# "units": json.dumps(item["units"], ensure_ascii=False),
# "general": item["general"],
# "special": item["special"],
# "other": item["other"],
# "footnotes": json.dumps(item["footnotes"], ensure_ascii=False),
# "quota_quantity": item["quotaQuantity"],
# "additional_duties": item["additionalDuties"],
# "addiitional_duties": item["addiitionalDuties"]
# }
#
# # 找到当前项的父级
# while stack and int(stack[-1]['indent']) >= current_level:
# stack.pop()
#
# if stack:
# # 设置当前项的 parent_id 为父级的 id
# current_item['parent_id'] = stack[-1]['item']['id']
#
# # 将当前项添加到层级结构中
# hierarchy.append(current_item)
#
# # 将当前项压入栈中
# stack.append({'indent': current_level, 'item': current_item})
#
# return hierarchy
#
# with open(f"E:/PythonCodes/cutoms-system-backend/test/{hts}.json", 'r', encoding='utf-8') as r:
# data = json.load(r)
# for x in data:
# x["indent"] = str(int(x["indent"]) + 1)
# data.append({
# "htsno": str(item["chapter_name"]).replace("Chapter", "").strip() if int(
# str(item["chapter_name"]).replace("Chapter", "").strip(" ")) > 10 else "0" +
# str(item["chapter_name"]).replace(
# "Chapter", "").strip(),
# "indent": "0",
# "description": class_.class_description,
# "units": [],
# "general": "",
# "special": "",
# "other": "",
# "footnotes": [],
# "quotaQuantity": "",
# "additionalDuties": "",
# "addiitionalDuties": ""
# })
# hierarchy = build_hierarchy_with_ids(data)
# # 批量插入查询结果
# create_tasks = [
# HtsItem(
# id=item["id"],
#
# parent_id=item["parent_id"],
#
# htsno=item["htsno"],
#
# indent=item["indent"],
#
# description=item["description"],
#
# units=item["units"],
#
# general=item["general"],
#
# special=item["special"],
#
# other=item["other"],
#
# quota_quantity=item["quota_quantity"],
#
# additional_duties=item["additional_duties"],
#
# footnotes=item["footnotes"],
#
# class_=class_,
#
# version=version
# )
# for item in hierarchy
# ]
# await HtsItem.bulk_create(create_tasks)
# print(class_.chapter_name)

2
app.py
View File

@ -24,6 +24,7 @@ from api.role import roleAPI
from api.server import serverAPI
from api.user import userAPI
from api.code import codeAPI
from api.hts import htsAPI
from config.database import init_db, close_db
from config.env import AppConfig
from config.get_ElasticSearch import ElasticSearch
@ -93,6 +94,7 @@ api_list = [
{'api': i18nAPI, 'tags': ['国际化管理']},
{'api': configApi, 'tags': ['配置管理']},
{'api': codeAPI, 'tags': ['编码管理']},
{'api': htsAPI, 'tags': ['海关税率管理']},
]
for api in api_list:

View File

@ -331,6 +331,7 @@ class UploadSettings:
'rmvb',
# pdf
'pdf',
'json'
]
"""
默认允许上传的文件扩展名列表

View File

@ -15,6 +15,7 @@ from models.log import LoginLog, OperationLog
from models.permission import Permission
from models.role import Role, RolePermission
from models.user import User, UserRole
from models.hts import Version, HtsClass, HtsItem
__all__ = [
'Department',
@ -33,5 +34,8 @@ __all__ = [
'CodeFeedback',
'CodeImport',
'QueryCode',
'QueryCodeLog'
'QueryCodeLog',
'HtsClass',
'HtsItem',
'Version',
]

165
models/hts.py Normal file
View File

@ -0,0 +1,165 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : hts.py
# @Comment : 本程序用于海关编码税率模型
from tortoise import fields
from models.common import BaseModel
class Version(BaseModel):
"""
版本信息模型
"""
version = fields.CharField(max_length=255, description="版本号", source_field="version")
"""
版本号
- 最大长度为 255 个字符
- 映射到数据库字段 version
"""
date = fields.CharField(max_length=255,description="版本日期", source_field="date")
"""
版本日期
- 映射到数据库字段 date
"""
url = fields.CharField(max_length=255, description="下载地址", source_field="url")
"""
下载地址
- 最大长度为 255 个字符
- 映射到数据库字段 url
"""
class Meta:
table = "version"
table_description = "版本信息"
class HtsClass(BaseModel):
"""
编码类信息模型
"""
class_name = fields.TextField(description="类名", source_field="class_name")
"""
类名
- 映射到数据库字段 class_name
"""
class_description = fields.TextField(description="类描述", source_field="class_description")
"""
类描述
- 映射到数据库字段 class_description
"""
chapter_name = fields.TextField(description="章节", source_field="chapter_name")
"""
章节
- 映射到数据库字段 chapter_name
"""
chapter_description = fields.TextField(description="章节描述", source_field="chapter_description")
"""
章节描述
- 映射到数据库字段 chapter_description
"""
class Meta:
table = "hts_class"
table_description = "编码类信息"
class HtsItem(BaseModel):
"""
编码项信息模型
"""
parent_id = fields.CharField(max_length=255, null=True, description="父编码", source_field="parent_id")
"""
父编码
- 最大长度为 255 个字符
- 映射到数据库字段 parent_id
- 可为空
"""
htsno = fields.CharField(max_length=255, description="编码", source_field="htsno")
"""
编码
- 最大长度为 255 个字符
- 映射到数据库字段 htsno
"""
indent = fields.IntField(description="层级",null=True, source_field="indent")
"""
缩进
- 映射到数据库字段 indent
"""
description = fields.TextField(description="描述",null=True, source_field="description")
"""
描述
- 映射到数据库字段 description
"""
units = fields.JSONField(description="单位列表", null=True, source_field="units")
"""
单位列表
- 映射到数据库字段 units
"""
general = fields.TextField(description="通用税率",null=True, source_field="general")
"""
通用税率
- 最大长度为 50 个字符
- 映射到数据库字段 general
"""
special = fields.TextField(description="特殊税率,适用于特定国家或地区",null=True, source_field="special")
"""
特殊税率适用于特定国家或地区
- 最大长度为 255 个字符
- 映射到数据库字段 special
"""
other = fields.TextField(description="其他税率",null=True, source_field="other")
"""
其他税率
- 最大长度为 50 个字符
- 映射到数据库字段 other
"""
quota_quantity = fields.TextField(description="配额数量", null=True, source_field="quota_quantity")
"""
配额数量
- 最大长度为 50 个字符
- 映射到数据库字段 quota_quantity
"""
additional_duties = fields.TextField(description="附加税",null=True, source_field="additional_duties")
"""
附加税
- 最大长度为 50 个字符
- 映射到数据库字段 additional_duties
"""
footnotes = fields.JSONField(null=True, description="脚注列表", source_field="footnotes")
"""
脚注列表
- 映射到数据库字段 footnotes
- 可为空
"""
class_ = fields.ForeignKeyField("models.HtsClass", related_name="class_items", on_delete=fields.CASCADE,
description="所属类", source_field="class_id")
version = fields.ForeignKeyField("models.Version", related_name="version_items", on_delete=fields.CASCADE,
description="所属版本", source_field="version_id")
class Meta:
table = "hts_item"
table_description = "编码项信息"

391
schemas/hts.py Normal file
View File

@ -0,0 +1,391 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/24 02:29:03
# @UpdateTime : 2025/03/24 02:29:03
# @Author : sonder
# @File : version.py
# @Comment : 本程序用于生成版本信息参数和响应模型
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_snake
from schemas.common import BaseResponse, ListQueryResult
class VersionInfo(BaseModel):
"""税率编码版本信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class AddVersionParams(BaseModel):
"""新增税率编码版本参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class UpdateVersionParams(BaseModel):
"""更新税率编码版本参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
version: str = Field(
title="版本号"
)
date: str = Field(
title="版本日期"
)
url: str = Field(
title="下载地址"
)
class GetVersionInfoResponse(BaseResponse):
"""获取税率编码版本信息响应"""
data: VersionInfo = Field(None, title="版本信息信息")
class GetVersionInfoListResult(ListQueryResult):
"""获取税率编码版本信息列表响应结果"""
result: List[VersionInfo] = Field(None, title="版本信息信息列表")
class GetVersionListResponse(BaseResponse):
"""获取税率编码版本信息列表响应"""
data: GetVersionInfoListResult = Field(None, title="版本信息信息列表")
class HTSClassInfo(BaseModel):
"""编码类别信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class AddHTSClassParams(BaseModel):
"""新增编码类别参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class UpdateHTSClassParams(BaseModel):
"""更新编码类别参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
class_name: str = Field(
title="类名"
)
class_description: str = Field(
title="类描述"
)
chapter_name: str = Field(
title="章节"
)
chapter_description: str = Field(
title="章节描述"
)
class GetHTSClassInfoResponse(BaseResponse):
"""获取编码类别信息响应"""
data: HTSClassInfo = Field(None, title="编码类信息信息")
class GetHTSClassInfoListResult(ListQueryResult):
"""获取编码类别信息列表响应结果"""
result: List[HTSClassInfo] = Field(None, title="编码类信息信息列表")
class GetHTSClassListResponse(BaseResponse):
"""获取编码类别信息列表响应"""
data: GetHTSClassInfoListResult = Field(None, title="编码类信息信息列表")
class HtsItemInfo(BaseModel):
"""编码项目信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
id: Optional[str] = Field(
title="主键"
)
create_time: Optional[datetime] = Field(
title="创建时间"
)
update_time: Optional[datetime] = Field(
title="更新时间"
)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class AddHtsItemParams(BaseModel):
"""新增编码项目参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class UpdateHtsItemParams(BaseModel):
"""更新编码项目参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
parent_id: Optional[str] = Field(
title="父编码"
)
htsno: str = Field(
title="编码"
)
indent: int = Field(
title="层级"
)
description: str = Field(
title="描述"
)
units: dict = Field(
title="单位列表"
)
general: str = Field(
title="通用税率"
)
special: str = Field(
title="特殊税率,适用于特定国家或地区"
)
other: str = Field(
title="其他税率"
)
quota_quantity: str = Field(
title="配额数量"
)
additional_duties: str = Field(
title="附加税"
)
footnotes: dict = Field(
title="脚注列表"
)
class_id: str = Field(
title="所属类"
)
version_id: str = Field(
title="所属版本"
)
class ImportHtsItemParams(BaseModel):
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
file_id: str = Field(
title="文件ID"
)
version_id: str = Field(
title="版本ID"
)
class_id: str = Field(
title="类ID"
)
class GetHtsItemInfoResponse(BaseResponse):
"""获取编码项目信息响应"""
data: HtsItemInfo = Field(None, title="编码项信息信息")
class GetHtsItemInfoListResult(ListQueryResult):
"""获取编码项目信息列表响应结果"""
result: List[HtsItemInfo] = Field(None, title="编码项信息信息列表")
class GetHtsItemListResponse(BaseResponse):
"""获取编码项目信息列表响应"""
data: GetHtsItemInfoListResult = Field(None, title="编码项信息信息列表")