""" 设计相关路由 提供设计生成、查询、删除、下载、视频生成、3D模型生成接口 """ import os import logging from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.responses import FileResponse, RedirectResponse from sqlalchemy.orm import Session from ..database import get_db from ..models import User, Design, DesignImage from ..schemas import DesignCreate, DesignResponse, DesignListResponse, DesignImageResponse from ..utils.deps import get_current_user from ..services import design_service from ..services import ai_video_generator_kling from ..services import ai_3d_generator logger = logging.getLogger(__name__) # uploads 基础目录 UPLOADS_BASE = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads") def _delete_local_file(url_path: str): """删除本地存储的文件(如 /uploads/videos/xxx.mp4)""" if not url_path or not url_path.startswith("/uploads/"): return rel_path = url_path.lstrip("/uploads/") full_path = os.path.join(UPLOADS_BASE, rel_path) if os.path.exists(full_path): try: os.remove(full_path) logger.info(f"已删除旧文件: {full_path}") except Exception as e: logger.warning(f"删除旧文件失败: {full_path}, {e}") router = APIRouter(prefix="/api/designs", tags=["设计"]) def design_to_response(design: Design) -> DesignResponse: """将 Design 模型转换为响应格式""" # 构建多视角图片列表 images = [] if hasattr(design, 'images') and design.images: images = [ DesignImageResponse( id=img.id, view_name=img.view_name, image_url=img.image_url, model_used=img.model_used, prompt_used=img.prompt_used, sort_order=img.sort_order, model_3d_url=img.model_3d_url, ) for img in design.images ] return DesignResponse( id=design.id, user_id=design.user_id, category={ "id": design.category.id, "name": design.category.name, "icon": design.category.icon, "sort_order": design.category.sort_order, "flow_type": design.category.flow_type }, sub_type={ "id": design.sub_type.id, "category_id": design.sub_type.category_id, "name": design.sub_type.name, "description": design.sub_type.description, "preview_image": design.sub_type.preview_image, "sort_order": design.sub_type.sort_order } if design.sub_type else None, color={ "id": design.color.id, "category_id": design.color.category_id, "name": design.color.name, "hex_code": design.color.hex_code, "sort_order": design.color.sort_order } if design.color else None, prompt=design.prompt, carving_technique=design.carving_technique, design_style=design.design_style, motif=design.motif, size_spec=design.size_spec, surface_finish=design.surface_finish, usage_scene=design.usage_scene, image_url=design.image_url, video_url=design.video_url, images=images, status=design.status, created_at=design.created_at, updated_at=design.updated_at ) @router.post("/generate", response_model=DesignResponse) async def generate_design( design_data: DesignCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 提交设计生成请求(异步,支持 AI 多视角生图) 需要认证 """ try: design = await design_service.create_design_async( db=db, user_id=current_user.id, design_data=design_data ) return design_to_response(design) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) @router.get("", response_model=DesignListResponse) def get_designs( page: int = Query(1, ge=1, description="页码"), page_size: int = Query(20, ge=1, le=100, description="每页数量"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 获取当前用户的设计历史列表(分页) 需要认证 """ designs, total = design_service.get_user_designs( db=db, user_id=current_user.id, page=page, page_size=page_size ) return DesignListResponse( items=[design_to_response(d) for d in designs], total=total, page=page, page_size=page_size ) @router.get("/{design_id}", response_model=DesignResponse) def get_design( design_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 获取设计详情 只能查看自己的设计,非本人设计返回 404 """ design = design_service.get_design_by_id( db=db, design_id=design_id, user_id=current_user.id ) if not design: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在" ) return design_to_response(design) @router.delete("/{design_id}") def delete_design( design_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 删除设计 只能删除自己的设计,非本人设计返回 404 """ success = design_service.delete_design( db=db, design_id=design_id, user_id=current_user.id ) if not success: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在" ) return {"message": "删除成功"} @router.get("/{design_id}/download") def download_design( design_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 下载设计图 只能下载自己的设计,非本人设计返回 404 支持远程 URL(重定向)和本地文件(兼容历史数据) """ design = design_service.get_design_by_id( db=db, design_id=design_id, user_id=current_user.id ) if not design: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在" ) if not design.image_url: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="设计图片不存在" ) # 远程 URL 直接重定向 if design.image_url.startswith("http"): return RedirectResponse(url=design.image_url) # 本地文件(兼容历史数据) file_path = design.image_url.lstrip("/") if not os.path.exists(file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="设计图片文件不存在" ) return FileResponse( path=file_path, filename=f"design_{design_id}.png", media_type="image/png" ) @router.post("/{design_id}/generate-video", response_model=DesignResponse) async def generate_video( design_id: int, force: bool = Query(False, description="强制重新生成"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 为设计生成 360 度旋转展示视频 取设计的多视角图片,通过可灵 AI 多图参考生视频 API 生成视频 """ design = design_service.get_design_by_id(db=db, design_id=design_id, user_id=current_user.id) if not design: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在") # 已有视频且不强制重生时直接返回 if design.video_url and not force: return design_to_response(design) # 强制重生时删除旧视频文件 if force and design.video_url: _delete_local_file(design.video_url) design.video_url = None # 收集多视角图片 URL image_urls = [] if design.images: for img in sorted(design.images, key=lambda x: x.sort_order): if img.image_url: image_urls.append(img.image_url) if not image_urls and design.image_url: image_urls.append(design.image_url) if not image_urls: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="设计没有图片,无法生成视频") logger.info(f"设计 {design_id} 生成视频,共收集到 {len(image_urls)} 张图片") try: video_url = await ai_video_generator_kling.generate_video(image_urls) design.video_url = video_url db.commit() db.refresh(design) return design_to_response(design) except Exception as e: logger.error(f"视频生成失败: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"视频生成失败: {str(e)}" ) @router.post("/{design_id}/generate-3d", response_model=DesignResponse) async def generate_3d_model( design_id: int, force: bool = Query(False, description="强制重新生成"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """ 为设计生成 3D 模型 收集所有多视角图片,取效果图(45度视角)生成 .glb 格式 3D 模型 """ design = design_service.get_design_by_id(db=db, design_id=design_id, user_id=current_user.id) if not design: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在") # 检查是否已有 3D 模型(第一张图片) if design.images and not force: first_img = sorted(design.images, key=lambda x: x.sort_order)[0] if first_img.model_3d_url: return design_to_response(design) # 强制重生时删除旧文件 if force and design.images: first_img = sorted(design.images, key=lambda x: x.sort_order)[0] if first_img.model_3d_url: _delete_local_file(first_img.model_3d_url) first_img.model_3d_url = None # 收集所有多视角图片 URL 和视角名称 image_urls = [] view_names = [] if design.images: for img in sorted(design.images, key=lambda x: x.sort_order): if img.image_url: image_urls.append(img.image_url) view_names.append(img.view_name or "效果图") if not image_urls and design.image_url: image_urls.append(design.image_url) view_names.append("效果图") if not image_urls: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="设计没有图片,无法生成3D模型") try: model_url = await ai_3d_generator.generate_3d_model(image_urls, view_names) # 将 3D 模型 URL 保存到第一张图片 if design.images: first_img = sorted(design.images, key=lambda x: x.sort_order)[0] first_img.model_3d_url = model_url db.commit() db.refresh(design) return design_to_response(design) except Exception as e: logger.error(f"3D 模型生成失败: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"3D 模型生成失败: {str(e)}" )