""" 管理后台路由 提供系统配置、用户管理、品类管理、设计管理接口 所有接口需要管理员权限 """ from datetime import datetime, timedelta import httpx from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import func, or_ from ..database import get_db from ..models import User, Design, Category, SubType, Color, SystemConfig, PromptTemplate, PromptMapping from ..schemas.admin import ( SystemConfigItem, SystemConfigUpdate, SystemConfigResponse, AdminUserResponse, AdminUserListResponse, AdminSetAdmin, CategoryCreate, CategoryUpdate, SubTypeCreate, SubTypeUpdate, ColorCreate, ColorUpdate, AdminDesignListResponse, DashboardStats, PromptTemplateItem, PromptTemplateUpdate, PromptMappingItem, PromptMappingCreate, PromptMappingUpdate ) from ..utils.deps import get_admin_user router = APIRouter(prefix="/api/admin", tags=["管理后台"]) # ==================== 仪表盘 ==================== @router.get("/dashboard", response_model=DashboardStats) def get_dashboard( db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取仪表盘统计数据""" today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return DashboardStats( total_users=db.query(func.count(User.id)).scalar() or 0, total_designs=db.query(func.count(Design.id)).scalar() or 0, total_categories=db.query(func.count(Category.id)).scalar() or 0, today_designs=db.query(func.count(Design.id)).filter(Design.created_at >= today).scalar() or 0, today_users=db.query(func.count(User.id)).filter(User.created_at >= today).scalar() or 0, ) # ==================== 系统配置管理 ==================== @router.get("/configs", response_model=SystemConfigResponse) def get_configs( group: str = Query(None, description="按分组筛选"), db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取系统配置列表""" query = db.query(SystemConfig) if group: query = query.filter(SystemConfig.config_group == group) items = query.order_by(SystemConfig.config_group, SystemConfig.config_key).all() # 敏感信息脱敏显示 result = [] for item in items: cfg = SystemConfigItem.model_validate(item) if item.is_secret == "Y" and item.config_value: # 只显示前4位和后4位 val = item.config_value if len(val) > 8: cfg.config_value = val[:4] + "****" + val[-4:] else: cfg.config_value = "****" result.append(cfg) return SystemConfigResponse(items=result) @router.put("/configs") def update_configs( data: SystemConfigUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """批量更新系统配置""" updated = 0 for key, value in data.configs.items(): config = db.query(SystemConfig).filter(SystemConfig.config_key == key).first() if config: config.config_value = value updated += 1 else: # 自动创建不存在的配置项 new_config = SystemConfig( config_key=key, config_value=value, config_group="general" ) db.add(new_config) updated += 1 db.commit() return {"message": f"已更新 {updated} 项配置"} @router.post("/configs/init") def init_default_configs( db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """初始化默认配置项(仅当配置表为空时)""" count = db.query(func.count(SystemConfig.id)).scalar() if count > 0: return {"message": "配置已存在,跳过初始化"} defaults = [ ("SILICONFLOW_API_KEY", "", "SiliconFlow API Key", "ai", "Y"), ("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1", "SiliconFlow 接口地址", "ai", "N"), ("VOLCENGINE_API_KEY", "", "火山引擎 API Key", "ai", "Y"), ("VOLCENGINE_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3", "火山引擎接口地址", "ai", "N"), ("AI_IMAGE_MODEL", "flux-dev", "默认AI生图模型 (flux-dev / seedream-5.0)", "ai", "N"), ("AI_IMAGE_SIZE", "1024", "AI生图默认尺寸", "ai", "N"), ] for key, val, desc, group, secret in defaults: db.add(SystemConfig( config_key=key, config_value=val, description=desc, config_group=group, is_secret=secret )) db.commit() return {"message": f"已初始化 {len(defaults)} 项默认配置"} class TestConnectionRequest(BaseModel): """API 连接测试请求""" provider: str # siliconflow / volcengine api_key: str base_url: str @router.post("/configs/test") async def test_api_connection( data: TestConnectionRequest, admin: User = Depends(get_admin_user) ): """测试 AI API 连接是否正常""" try: if data.provider == "siliconflow": url = f"{data.base_url}/models" headers = {"Authorization": f"Bearer {data.api_key}"} async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(url, headers=headers) if resp.status_code == 200: return {"message": "连接成功,API Key 有效"} elif resp.status_code == 401: raise HTTPException(status_code=400, detail="API Key 无效,请检查") else: raise HTTPException(status_code=400, detail=f"请求失败,状态码: {resp.status_code}") elif data.provider == "volcengine": url = f"{data.base_url}/models" headers = {"Authorization": f"Bearer {data.api_key}"} async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(url, headers=headers) if resp.status_code == 200: return {"message": "连接成功,API Key 有效"} elif resp.status_code == 401: raise HTTPException(status_code=400, detail="API Key 无效,请检查") else: # 火山引擎可能返回其他状态码但连接本身成功 return {"message": f"连接成功(状态码: {resp.status_code})"} else: raise HTTPException(status_code=400, detail=f"未知的服务提供商: {data.provider}") except httpx.ConnectError: raise HTTPException(status_code=400, detail="连接失败,请检查接口地址") except httpx.TimeoutException: raise HTTPException(status_code=400, detail="连接超时,请检查网络") except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=f"测试失败: {str(e)}") # ==================== 用户管理 ==================== @router.get("/users", response_model=AdminUserListResponse) def get_users( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), keyword: str = Query(None, description="搜索用户名/昵称"), db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取用户列表""" query = db.query(User) if keyword: query = query.filter( or_(User.username.like(f"%{keyword}%"), User.nickname.like(f"%{keyword}%")) ) total = query.count() users = query.order_by(User.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all() items = [] for u in users: design_count = db.query(func.count(Design.id)).filter(Design.user_id == u.id).scalar() or 0 items.append(AdminUserResponse( id=u.id, username=u.username, nickname=u.nickname, phone=u.phone, is_admin=u.is_admin, created_at=u.created_at, design_count=design_count )) return AdminUserListResponse(items=items, total=total, page=page, page_size=page_size) @router.put("/users/{user_id}/admin") def set_user_admin( user_id: int, data: AdminSetAdmin, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """设置/取消管理员""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") if user.id == admin.id: raise HTTPException(status_code=400, detail="不能修改自己的管理员状态") user.is_admin = data.is_admin db.commit() return {"message": f"用户 {user.username} {'已设为管理员' if data.is_admin else '已取消管理员'}"} @router.delete("/users/{user_id}") def delete_user( user_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """删除用户""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") if user.id == admin.id: raise HTTPException(status_code=400, detail="不能删除自己") if user.is_admin: raise HTTPException(status_code=400, detail="不能删除其他管理员") db.delete(user) db.commit() return {"message": "用户已删除"} # ==================== 品类管理 ==================== @router.get("/categories") def get_categories( db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取所有品类(含子类型和颜色)""" categories = db.query(Category).order_by(Category.sort_order).all() result = [] for cat in categories: result.append({ "id": cat.id, "name": cat.name, "icon": cat.icon, "sort_order": cat.sort_order, "flow_type": cat.flow_type, "sub_types": [{"id": st.id, "name": st.name, "description": st.description, "preview_image": st.preview_image, "sort_order": st.sort_order} for st in sorted(cat.sub_types, key=lambda x: x.sort_order)], "colors": [{"id": c.id, "name": c.name, "hex_code": c.hex_code, "sort_order": c.sort_order} for c in sorted(cat.colors, key=lambda x: x.sort_order)] }) return result @router.post("/categories") def create_category( data: CategoryCreate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """创建品类""" cat = Category(name=data.name, icon=data.icon, sort_order=data.sort_order, flow_type=data.flow_type) db.add(cat) db.commit() db.refresh(cat) return {"id": cat.id, "message": "品类创建成功"} @router.put("/categories/{cat_id}") def update_category( cat_id: int, data: CategoryUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """更新品类""" cat = db.query(Category).filter(Category.id == cat_id).first() if not cat: raise HTTPException(status_code=404, detail="品类不存在") for field, value in data.model_dump(exclude_unset=True).items(): setattr(cat, field, value) db.commit() return {"message": "品类更新成功"} @router.delete("/categories/{cat_id}") def delete_category( cat_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """删除品类(级联删除子类型和颜色)""" cat = db.query(Category).filter(Category.id == cat_id).first() if not cat: raise HTTPException(status_code=404, detail="品类不存在") # 检查是否有关联设计 design_count = db.query(func.count(Design.id)).filter(Design.category_id == cat_id).scalar() if design_count > 0: raise HTTPException(status_code=400, detail=f"品类下有 {design_count} 个设计,无法删除") # 删除子类型和颜色 db.query(SubType).filter(SubType.category_id == cat_id).delete() db.query(Color).filter(Color.category_id == cat_id).delete() db.delete(cat) db.commit() return {"message": "品类已删除"} # -- 子类型 -- @router.post("/sub-types") def create_sub_type( data: SubTypeCreate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """创建子类型""" cat = db.query(Category).filter(Category.id == data.category_id).first() if not cat: raise HTTPException(status_code=404, detail="品类不存在") st = SubType(category_id=data.category_id, name=data.name, description=data.description, preview_image=data.preview_image, sort_order=data.sort_order) db.add(st) db.commit() db.refresh(st) return {"id": st.id, "message": "子类型创建成功"} @router.put("/sub-types/{st_id}") def update_sub_type( st_id: int, data: SubTypeUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """更新子类型""" st = db.query(SubType).filter(SubType.id == st_id).first() if not st: raise HTTPException(status_code=404, detail="子类型不存在") for field, value in data.model_dump(exclude_unset=True).items(): setattr(st, field, value) db.commit() return {"message": "子类型更新成功"} @router.delete("/sub-types/{st_id}") def delete_sub_type( st_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """删除子类型""" st = db.query(SubType).filter(SubType.id == st_id).first() if not st: raise HTTPException(status_code=404, detail="子类型不存在") db.delete(st) db.commit() return {"message": "子类型已删除"} # -- 颜色 -- @router.post("/colors") def create_color( data: ColorCreate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """创建颜色""" cat = db.query(Category).filter(Category.id == data.category_id).first() if not cat: raise HTTPException(status_code=404, detail="品类不存在") color = Color(category_id=data.category_id, name=data.name, hex_code=data.hex_code, sort_order=data.sort_order) db.add(color) db.commit() db.refresh(color) return {"id": color.id, "message": "颜色创建成功"} @router.put("/colors/{color_id}") def update_color( color_id: int, data: ColorUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """更新颜色""" color = db.query(Color).filter(Color.id == color_id).first() if not color: raise HTTPException(status_code=404, detail="颜色不存在") for field, value in data.model_dump(exclude_unset=True).items(): setattr(color, field, value) db.commit() return {"message": "颜色更新成功"} @router.delete("/colors/{color_id}") def delete_color( color_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """删除颜色""" color = db.query(Color).filter(Color.id == color_id).first() if not color: raise HTTPException(status_code=404, detail="颜色不存在") db.delete(color) db.commit() return {"message": "颜色已删除"} # ==================== 设计管理 ==================== @router.get("/designs", response_model=AdminDesignListResponse) def get_all_designs( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), user_id: int = Query(None, description="按用户筛选"), status_filter: str = Query(None, alias="status", description="按状态筛选"), db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取所有设计列表""" query = db.query(Design) if user_id: query = query.filter(Design.user_id == user_id) if status_filter: query = query.filter(Design.status == status_filter) total = query.count() designs = query.order_by(Design.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all() items = [] for d in designs: items.append({ "id": d.id, "user_id": d.user_id, "username": d.user.username if d.user else None, "category_name": d.category.name if d.category else None, "sub_type_name": d.sub_type.name if d.sub_type else None, "color_name": d.color.name if d.color else None, "prompt": d.prompt, "image_url": d.image_url, "status": d.status, "created_at": d.created_at.isoformat() if d.created_at else None, }) return AdminDesignListResponse(items=items, total=total, page=page, page_size=page_size) @router.delete("/designs/{design_id}") def admin_delete_design( design_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """管理员删除任意设计""" design = db.query(Design).filter(Design.id == design_id).first() if not design: raise HTTPException(status_code=404, detail="设计不存在") db.delete(design) db.commit() return {"message": "设计已删除"} # ==================== 提示词管理 ==================== @router.get("/prompt-templates") def get_prompt_templates( db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取所有提示词模板""" templates = db.query(PromptTemplate).order_by(PromptTemplate.template_key).all() return [PromptTemplateItem.model_validate(t) for t in templates] @router.put("/prompt-templates/{template_id}") def update_prompt_template( template_id: int, data: PromptTemplateUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """更新提示词模板""" tpl = db.query(PromptTemplate).filter(PromptTemplate.id == template_id).first() if not tpl: raise HTTPException(status_code=404, detail="模板不存在") tpl.template_value = data.template_value if data.description is not None: tpl.description = data.description db.commit() return {"message": f"模板 '{tpl.template_key}' 更新成功"} @router.get("/prompt-mappings") def get_prompt_mappings( mapping_type: str = Query(None, description="按映射类型筛选"), db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取提示词映射列表""" query = db.query(PromptMapping) if mapping_type: query = query.filter(PromptMapping.mapping_type == mapping_type) mappings = query.order_by(PromptMapping.mapping_type, PromptMapping.sort_order).all() return [PromptMappingItem.model_validate(m) for m in mappings] @router.get("/prompt-mappings/types") def get_mapping_types( db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """获取所有映射类型及其数量""" from sqlalchemy import distinct types = db.query( PromptMapping.mapping_type, func.count(PromptMapping.id) ).group_by(PromptMapping.mapping_type).all() return [{"type": t, "count": c, "label": { "category": "品类", "color": "颜色", "view": "视角", "carving": "雕刻工艺", "style": "设计风格", "motif": "题材纹样", "finish": "表面处理", "scene": "用途场景", "sub_type": "子类型" }.get(t, t)} for t, c in types] @router.post("/prompt-mappings") def create_prompt_mapping( data: PromptMappingCreate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """创建提示词映射""" # 检查重复 existing = db.query(PromptMapping).filter( PromptMapping.mapping_type == data.mapping_type, PromptMapping.cn_key == data.cn_key ).first() if existing: raise HTTPException(status_code=400, detail=f"映射 '{data.cn_key}' 已存在") mapping = PromptMapping( mapping_type=data.mapping_type, cn_key=data.cn_key, en_value=data.en_value, sort_order=data.sort_order ) db.add(mapping) db.commit() db.refresh(mapping) return {"id": mapping.id, "message": "映射创建成功"} @router.put("/prompt-mappings/{mapping_id}") def update_prompt_mapping( mapping_id: int, data: PromptMappingUpdate, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """更新提示词映射""" mapping = db.query(PromptMapping).filter(PromptMapping.id == mapping_id).first() if not mapping: raise HTTPException(status_code=404, detail="映射不存在") for field, value in data.model_dump(exclude_unset=True).items(): setattr(mapping, field, value) db.commit() return {"message": "映射更新成功"} @router.delete("/prompt-mappings/{mapping_id}") def delete_prompt_mapping( mapping_id: int, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """删除提示词映射""" mapping = db.query(PromptMapping).filter(PromptMapping.id == mapping_id).first() if not mapping: raise HTTPException(status_code=404, detail="映射不存在") db.delete(mapping) db.commit() return {"message": "映射已删除"} @router.post("/prompt-preview") def preview_prompt( params: dict, db: Session = Depends(get_db), admin: User = Depends(get_admin_user) ): """预览提示词生成结果""" from ..services.prompt_builder import build_prompt try: prompt = build_prompt( category_name=params.get("category_name", "牌子"), view_name=params.get("view_name", "效果图"), sub_type_name=params.get("sub_type_name"), color_name=params.get("color_name"), user_prompt=params.get("user_prompt"), carving_technique=params.get("carving_technique"), design_style=params.get("design_style"), motif=params.get("motif"), size_spec=params.get("size_spec"), surface_finish=params.get("surface_finish"), usage_scene=params.get("usage_scene"), ) return {"prompt": prompt} except Exception as e: raise HTTPException(status_code=400, detail=f"提示词生成失败: {str(e)}")