Files
bb84747917 feat(ai): 升级AI生图模型及多视角一致性支持
- 将默认AI生图模型升级为flux-dev及seedream-5.0版本
- SiliconFlow模型由FLUX.1-dev切换为Kolors,优化调用参数和返回值
- 火山引擎Seedream升级至5.0 lite版本,支持多视角参考图传入
- 设计图片字段由字符串改为Text扩展URL长度限制
- 设计图下载支持远程URL重定向和本地文件兼容
- 生成AI图片时多视角保持风格一致,SiliconFlow复用seed,Seedream传参考图
- 后台配置界面更改模型名称及价格显示,新增API Key状态检测
- 前端照片下载从链接改为按钮,远程文件新窗口打开
- 设计相关接口支持较长请求超时,下载走API路径无/api前缀
- 前端页面兼容驼峰与下划线格式URL参数识别
- 用户中心设计图下载支持本地文件Token授权下载
- 初始化数据库新增完整表结构与约束,适配新版设计业务逻辑
2026-03-27 17:39:01 +08:00

628 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
管理后台路由
提供系统配置、用户管理、品类管理、设计管理接口
所有接口需要管理员权限
"""
from datetime import datetime, timedelta
import httpx
from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import func, or_
from ..database import get_db
from ..models import User, Design, Category, SubType, Color, SystemConfig, PromptTemplate, PromptMapping
from ..schemas.admin import (
SystemConfigItem, SystemConfigUpdate, SystemConfigResponse,
AdminUserResponse, AdminUserListResponse, AdminSetAdmin,
CategoryCreate, CategoryUpdate, SubTypeCreate, SubTypeUpdate,
ColorCreate, ColorUpdate,
AdminDesignListResponse, DashboardStats,
PromptTemplateItem, PromptTemplateUpdate,
PromptMappingItem, PromptMappingCreate, PromptMappingUpdate
)
from ..utils.deps import get_admin_user
router = APIRouter(prefix="/api/admin", tags=["管理后台"])
# ==================== 仪表盘 ====================
@router.get("/dashboard", response_model=DashboardStats)
def get_dashboard(
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取仪表盘统计数据"""
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
return DashboardStats(
total_users=db.query(func.count(User.id)).scalar() or 0,
total_designs=db.query(func.count(Design.id)).scalar() or 0,
total_categories=db.query(func.count(Category.id)).scalar() or 0,
today_designs=db.query(func.count(Design.id)).filter(Design.created_at >= today).scalar() or 0,
today_users=db.query(func.count(User.id)).filter(User.created_at >= today).scalar() or 0,
)
# ==================== 系统配置管理 ====================
@router.get("/configs", response_model=SystemConfigResponse)
def get_configs(
group: str = Query(None, description="按分组筛选"),
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取系统配置列表"""
query = db.query(SystemConfig)
if group:
query = query.filter(SystemConfig.config_group == group)
items = query.order_by(SystemConfig.config_group, SystemConfig.config_key).all()
# 敏感信息脱敏显示
result = []
for item in items:
cfg = SystemConfigItem.model_validate(item)
if item.is_secret == "Y" and item.config_value:
# 只显示前4位和后4位
val = item.config_value
if len(val) > 8:
cfg.config_value = val[:4] + "****" + val[-4:]
else:
cfg.config_value = "****"
result.append(cfg)
return SystemConfigResponse(items=result)
@router.put("/configs")
def update_configs(
data: SystemConfigUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""批量更新系统配置"""
updated = 0
for key, value in data.configs.items():
config = db.query(SystemConfig).filter(SystemConfig.config_key == key).first()
if config:
config.config_value = value
updated += 1
else:
# 自动创建不存在的配置项
new_config = SystemConfig(
config_key=key,
config_value=value,
config_group="general"
)
db.add(new_config)
updated += 1
db.commit()
return {"message": f"已更新 {updated} 项配置"}
@router.post("/configs/init")
def init_default_configs(
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""初始化默认配置项(仅当配置表为空时)"""
count = db.query(func.count(SystemConfig.id)).scalar()
if count > 0:
return {"message": "配置已存在,跳过初始化"}
defaults = [
("SILICONFLOW_API_KEY", "", "SiliconFlow API Key", "ai", "Y"),
("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1", "SiliconFlow 接口地址", "ai", "N"),
("VOLCENGINE_API_KEY", "", "火山引擎 API Key", "ai", "Y"),
("VOLCENGINE_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3", "火山引擎接口地址", "ai", "N"),
("AI_IMAGE_MODEL", "flux-dev", "默认AI生图模型 (flux-dev / seedream-5.0)", "ai", "N"),
("AI_IMAGE_SIZE", "1024", "AI生图默认尺寸", "ai", "N"),
]
for key, val, desc, group, secret in defaults:
db.add(SystemConfig(
config_key=key, config_value=val,
description=desc, config_group=group, is_secret=secret
))
db.commit()
return {"message": f"已初始化 {len(defaults)} 项默认配置"}
class TestConnectionRequest(BaseModel):
"""API 连接测试请求"""
provider: str # siliconflow / volcengine
api_key: str
base_url: str
@router.post("/configs/test")
async def test_api_connection(
data: TestConnectionRequest,
admin: User = Depends(get_admin_user)
):
"""测试 AI API 连接是否正常"""
try:
if data.provider == "siliconflow":
url = f"{data.base_url}/models"
headers = {"Authorization": f"Bearer {data.api_key}"}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url, headers=headers)
if resp.status_code == 200:
return {"message": "连接成功API Key 有效"}
elif resp.status_code == 401:
raise HTTPException(status_code=400, detail="API Key 无效,请检查")
else:
raise HTTPException(status_code=400, detail=f"请求失败,状态码: {resp.status_code}")
elif data.provider == "volcengine":
url = f"{data.base_url}/models"
headers = {"Authorization": f"Bearer {data.api_key}"}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url, headers=headers)
if resp.status_code == 200:
return {"message": "连接成功API Key 有效"}
elif resp.status_code == 401:
raise HTTPException(status_code=400, detail="API Key 无效,请检查")
else:
# 火山引擎可能返回其他状态码但连接本身成功
return {"message": f"连接成功(状态码: {resp.status_code}"}
else:
raise HTTPException(status_code=400, detail=f"未知的服务提供商: {data.provider}")
except httpx.ConnectError:
raise HTTPException(status_code=400, detail="连接失败,请检查接口地址")
except httpx.TimeoutException:
raise HTTPException(status_code=400, detail="连接超时,请检查网络")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=f"测试失败: {str(e)}")
# ==================== 用户管理 ====================
@router.get("/users", response_model=AdminUserListResponse)
def get_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
keyword: str = Query(None, description="搜索用户名/昵称"),
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取用户列表"""
query = db.query(User)
if keyword:
query = query.filter(
or_(User.username.like(f"%{keyword}%"), User.nickname.like(f"%{keyword}%"))
)
total = query.count()
users = query.order_by(User.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
items = []
for u in users:
design_count = db.query(func.count(Design.id)).filter(Design.user_id == u.id).scalar() or 0
items.append(AdminUserResponse(
id=u.id, username=u.username, nickname=u.nickname,
phone=u.phone, is_admin=u.is_admin,
created_at=u.created_at, design_count=design_count
))
return AdminUserListResponse(items=items, total=total, page=page, page_size=page_size)
@router.put("/users/{user_id}/admin")
def set_user_admin(
user_id: int,
data: AdminSetAdmin,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""设置/取消管理员"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
if user.id == admin.id:
raise HTTPException(status_code=400, detail="不能修改自己的管理员状态")
user.is_admin = data.is_admin
db.commit()
return {"message": f"用户 {user.username} {'已设为管理员' if data.is_admin else '已取消管理员'}"}
@router.delete("/users/{user_id}")
def delete_user(
user_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""删除用户"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
if user.id == admin.id:
raise HTTPException(status_code=400, detail="不能删除自己")
if user.is_admin:
raise HTTPException(status_code=400, detail="不能删除其他管理员")
db.delete(user)
db.commit()
return {"message": "用户已删除"}
# ==================== 品类管理 ====================
@router.get("/categories")
def get_categories(
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取所有品类(含子类型和颜色)"""
categories = db.query(Category).order_by(Category.sort_order).all()
result = []
for cat in categories:
result.append({
"id": cat.id,
"name": cat.name,
"icon": cat.icon,
"sort_order": cat.sort_order,
"flow_type": cat.flow_type,
"sub_types": [{"id": st.id, "name": st.name, "description": st.description,
"preview_image": st.preview_image, "sort_order": st.sort_order}
for st in sorted(cat.sub_types, key=lambda x: x.sort_order)],
"colors": [{"id": c.id, "name": c.name, "hex_code": c.hex_code, "sort_order": c.sort_order}
for c in sorted(cat.colors, key=lambda x: x.sort_order)]
})
return result
@router.post("/categories")
def create_category(
data: CategoryCreate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""创建品类"""
cat = Category(name=data.name, icon=data.icon, sort_order=data.sort_order, flow_type=data.flow_type)
db.add(cat)
db.commit()
db.refresh(cat)
return {"id": cat.id, "message": "品类创建成功"}
@router.put("/categories/{cat_id}")
def update_category(
cat_id: int,
data: CategoryUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""更新品类"""
cat = db.query(Category).filter(Category.id == cat_id).first()
if not cat:
raise HTTPException(status_code=404, detail="品类不存在")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(cat, field, value)
db.commit()
return {"message": "品类更新成功"}
@router.delete("/categories/{cat_id}")
def delete_category(
cat_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""删除品类(级联删除子类型和颜色)"""
cat = db.query(Category).filter(Category.id == cat_id).first()
if not cat:
raise HTTPException(status_code=404, detail="品类不存在")
# 检查是否有关联设计
design_count = db.query(func.count(Design.id)).filter(Design.category_id == cat_id).scalar()
if design_count > 0:
raise HTTPException(status_code=400, detail=f"品类下有 {design_count} 个设计,无法删除")
# 删除子类型和颜色
db.query(SubType).filter(SubType.category_id == cat_id).delete()
db.query(Color).filter(Color.category_id == cat_id).delete()
db.delete(cat)
db.commit()
return {"message": "品类已删除"}
# -- 子类型 --
@router.post("/sub-types")
def create_sub_type(
data: SubTypeCreate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""创建子类型"""
cat = db.query(Category).filter(Category.id == data.category_id).first()
if not cat:
raise HTTPException(status_code=404, detail="品类不存在")
st = SubType(category_id=data.category_id, name=data.name,
description=data.description, preview_image=data.preview_image,
sort_order=data.sort_order)
db.add(st)
db.commit()
db.refresh(st)
return {"id": st.id, "message": "子类型创建成功"}
@router.put("/sub-types/{st_id}")
def update_sub_type(
st_id: int,
data: SubTypeUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""更新子类型"""
st = db.query(SubType).filter(SubType.id == st_id).first()
if not st:
raise HTTPException(status_code=404, detail="子类型不存在")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(st, field, value)
db.commit()
return {"message": "子类型更新成功"}
@router.delete("/sub-types/{st_id}")
def delete_sub_type(
st_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""删除子类型"""
st = db.query(SubType).filter(SubType.id == st_id).first()
if not st:
raise HTTPException(status_code=404, detail="子类型不存在")
db.delete(st)
db.commit()
return {"message": "子类型已删除"}
# -- 颜色 --
@router.post("/colors")
def create_color(
data: ColorCreate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""创建颜色"""
cat = db.query(Category).filter(Category.id == data.category_id).first()
if not cat:
raise HTTPException(status_code=404, detail="品类不存在")
color = Color(category_id=data.category_id, name=data.name,
hex_code=data.hex_code, sort_order=data.sort_order)
db.add(color)
db.commit()
db.refresh(color)
return {"id": color.id, "message": "颜色创建成功"}
@router.put("/colors/{color_id}")
def update_color(
color_id: int,
data: ColorUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""更新颜色"""
color = db.query(Color).filter(Color.id == color_id).first()
if not color:
raise HTTPException(status_code=404, detail="颜色不存在")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(color, field, value)
db.commit()
return {"message": "颜色更新成功"}
@router.delete("/colors/{color_id}")
def delete_color(
color_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""删除颜色"""
color = db.query(Color).filter(Color.id == color_id).first()
if not color:
raise HTTPException(status_code=404, detail="颜色不存在")
db.delete(color)
db.commit()
return {"message": "颜色已删除"}
# ==================== 设计管理 ====================
@router.get("/designs", response_model=AdminDesignListResponse)
def get_all_designs(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
user_id: int = Query(None, description="按用户筛选"),
status_filter: str = Query(None, alias="status", description="按状态筛选"),
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取所有设计列表"""
query = db.query(Design)
if user_id:
query = query.filter(Design.user_id == user_id)
if status_filter:
query = query.filter(Design.status == status_filter)
total = query.count()
designs = query.order_by(Design.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
items = []
for d in designs:
items.append({
"id": d.id,
"user_id": d.user_id,
"username": d.user.username if d.user else None,
"category_name": d.category.name if d.category else None,
"sub_type_name": d.sub_type.name if d.sub_type else None,
"color_name": d.color.name if d.color else None,
"prompt": d.prompt,
"image_url": d.image_url,
"status": d.status,
"created_at": d.created_at.isoformat() if d.created_at else None,
})
return AdminDesignListResponse(items=items, total=total, page=page, page_size=page_size)
@router.delete("/designs/{design_id}")
def admin_delete_design(
design_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""管理员删除任意设计"""
design = db.query(Design).filter(Design.id == design_id).first()
if not design:
raise HTTPException(status_code=404, detail="设计不存在")
db.delete(design)
db.commit()
return {"message": "设计已删除"}
# ==================== 提示词管理 ====================
@router.get("/prompt-templates")
def get_prompt_templates(
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取所有提示词模板"""
templates = db.query(PromptTemplate).order_by(PromptTemplate.template_key).all()
return [PromptTemplateItem.model_validate(t) for t in templates]
@router.put("/prompt-templates/{template_id}")
def update_prompt_template(
template_id: int,
data: PromptTemplateUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""更新提示词模板"""
tpl = db.query(PromptTemplate).filter(PromptTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="模板不存在")
tpl.template_value = data.template_value
if data.description is not None:
tpl.description = data.description
db.commit()
return {"message": f"模板 '{tpl.template_key}' 更新成功"}
@router.get("/prompt-mappings")
def get_prompt_mappings(
mapping_type: str = Query(None, description="按映射类型筛选"),
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取提示词映射列表"""
query = db.query(PromptMapping)
if mapping_type:
query = query.filter(PromptMapping.mapping_type == mapping_type)
mappings = query.order_by(PromptMapping.mapping_type, PromptMapping.sort_order).all()
return [PromptMappingItem.model_validate(m) for m in mappings]
@router.get("/prompt-mappings/types")
def get_mapping_types(
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""获取所有映射类型及其数量"""
from sqlalchemy import distinct
types = db.query(
PromptMapping.mapping_type,
func.count(PromptMapping.id)
).group_by(PromptMapping.mapping_type).all()
return [{"type": t, "count": c, "label": {
"category": "品类", "color": "颜色", "view": "视角",
"carving": "雕刻工艺", "style": "设计风格", "motif": "题材纹样",
"finish": "表面处理", "scene": "用途场景", "sub_type": "子类型"
}.get(t, t)} for t, c in types]
@router.post("/prompt-mappings")
def create_prompt_mapping(
data: PromptMappingCreate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""创建提示词映射"""
# 检查重复
existing = db.query(PromptMapping).filter(
PromptMapping.mapping_type == data.mapping_type,
PromptMapping.cn_key == data.cn_key
).first()
if existing:
raise HTTPException(status_code=400, detail=f"映射 '{data.cn_key}' 已存在")
mapping = PromptMapping(
mapping_type=data.mapping_type,
cn_key=data.cn_key,
en_value=data.en_value,
sort_order=data.sort_order
)
db.add(mapping)
db.commit()
db.refresh(mapping)
return {"id": mapping.id, "message": "映射创建成功"}
@router.put("/prompt-mappings/{mapping_id}")
def update_prompt_mapping(
mapping_id: int,
data: PromptMappingUpdate,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""更新提示词映射"""
mapping = db.query(PromptMapping).filter(PromptMapping.id == mapping_id).first()
if not mapping:
raise HTTPException(status_code=404, detail="映射不存在")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(mapping, field, value)
db.commit()
return {"message": "映射更新成功"}
@router.delete("/prompt-mappings/{mapping_id}")
def delete_prompt_mapping(
mapping_id: int,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""删除提示词映射"""
mapping = db.query(PromptMapping).filter(PromptMapping.id == mapping_id).first()
if not mapping:
raise HTTPException(status_code=404, detail="映射不存在")
db.delete(mapping)
db.commit()
return {"message": "映射已删除"}
@router.post("/prompt-preview")
def preview_prompt(
params: dict,
db: Session = Depends(get_db),
admin: User = Depends(get_admin_user)
):
"""预览提示词生成结果"""
from ..services.prompt_builder import build_prompt
try:
prompt = build_prompt(
category_name=params.get("category_name", "牌子"),
view_name=params.get("view_name", "效果图"),
sub_type_name=params.get("sub_type_name"),
color_name=params.get("color_name"),
user_prompt=params.get("user_prompt"),
carving_technique=params.get("carving_technique"),
design_style=params.get("design_style"),
motif=params.get("motif"),
size_spec=params.get("size_spec"),
surface_finish=params.get("surface_finish"),
usage_scene=params.get("usage_scene"),
)
return {"prompt": prompt}
except Exception as e:
raise HTTPException(status_code=400, detail=f"提示词生成失败: {str(e)}")