feat(design): 添加360视频和3D模型生成功能支持

- 在Design模型中新增video_url字段用于存储360度展示视频URL
- 在DesignImage模型中新增model_3d_url字段用于存储3D模型URL
- 设计路由新增生成视频接口,调用火山引擎即梦3.0 Pro API生成展示视频
- 设计路由新增生成3D模型接口,调用腾讯混元3D服务生成.glb格式3D模型
- 新增本地文件删除工具,支持强制重新生成时清理旧文件
- 设计响应Schema中添加video_url和model_3d_url字段支持前后端数据传递
- 前端设计详情页新增360度旋转3D模型展示区,支持生成、重新生成和下载3D模型
- 实现录制3D模型展示视频功能,支持捕获model-viewer旋转画面逐帧合成WebM文件下载
- 引入@google/model-viewer库作为3D模型Web组件展示支持
- 管理后台新增即梦视频生成和腾讯混元3D模型生成配置界面,方便服务密钥管理
- 前端API增加生成视频和生成3D模型接口请求方法,超时设置为10分钟以支持长时间处理
- 优化UI交互提示,新增生成中状态显示和错误提示,提升用户体验和操作反馈
This commit is contained in:
2026-03-27 23:26:56 +08:00
parent a1f56b1f8e
commit 8f5a86418e
17 changed files with 1517 additions and 6 deletions

View File

@@ -25,6 +25,7 @@ class Design(Base):
surface_finish = Column(String(50), nullable=True, comment="表面处理")
usage_scene = Column(String(50), nullable=True, comment="用途场景")
image_url = Column(Text, nullable=True, comment="设计图URL")
video_url = Column(Text, nullable=True, comment="360度展示视频URL")
status = Column(String(20), default="generating", comment="状态")
created_at = Column(DateTime, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")

View File

@@ -20,6 +20,7 @@ class DesignImage(Base):
model_used = Column(String(50), nullable=True, comment="使用的AI模型: flux-dev/seedream-4.5")
prompt_used = Column(Text, nullable=True, comment="实际使用的英文prompt")
sort_order = Column(Integer, default=0, comment="排序")
model_3d_url = Column(Text, nullable=True, comment="3D模型URL(.glb)")
created_at = Column(DateTime, server_default=func.now(), comment="创建时间")
# 关联关系

View File

@@ -1,17 +1,40 @@
"""
设计相关路由
提供设计生成、查询、删除、下载接口
提供设计生成、查询、删除、下载、视频生成、3D模型生成接口
"""
import os
import logging
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.responses import FileResponse, RedirectResponse
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import User, Design
from ..models import User, Design, DesignImage
from ..schemas import DesignCreate, DesignResponse, DesignListResponse, DesignImageResponse
from ..utils.deps import get_current_user
from ..services import design_service
from ..services import ai_video_generator
from ..services import ai_3d_generator
logger = logging.getLogger(__name__)
# uploads 基础目录
UPLOADS_BASE = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads")
def _delete_local_file(url_path: str):
"""删除本地存储的文件(如 /uploads/videos/xxx.mp4"""
if not url_path or not url_path.startswith("/uploads/"):
return
rel_path = url_path.lstrip("/uploads/")
full_path = os.path.join(UPLOADS_BASE, rel_path)
if os.path.exists(full_path):
try:
os.remove(full_path)
logger.info(f"已删除旧文件: {full_path}")
except Exception as e:
logger.warning(f"删除旧文件失败: {full_path}, {e}")
router = APIRouter(prefix="/api/designs", tags=["设计"])
@@ -29,6 +52,7 @@ def design_to_response(design: Design) -> DesignResponse:
model_used=img.model_used,
prompt_used=img.prompt_used,
sort_order=img.sort_order,
model_3d_url=img.model_3d_url,
)
for img in design.images
]
@@ -66,6 +90,7 @@ def design_to_response(design: Design) -> DesignResponse:
surface_finish=design.surface_finish,
usage_scene=design.usage_scene,
image_url=design.image_url,
video_url=design.video_url,
images=images,
status=design.status,
created_at=design.created_at,
@@ -219,3 +244,115 @@ def download_design(
filename=f"design_{design_id}.png",
media_type="image/png"
)
@router.post("/{design_id}/generate-video", response_model=DesignResponse)
async def generate_video(
design_id: int,
force: bool = Query(False, description="强制重新生成"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
为设计生成 360 度旋转展示视频
取设计的多视角图片,通过火山引擎即梦 3.0 Pro 生成视频
"""
design = design_service.get_design_by_id(db=db, design_id=design_id, user_id=current_user.id)
if not design:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在")
# 已有视频且不强制重生时直接返回
if design.video_url and not force:
return design_to_response(design)
# 强制重生时删除旧视频文件
if force and design.video_url:
_delete_local_file(design.video_url)
design.video_url = None
# 收集多视角图片 URL
image_urls = []
if design.images:
for img in sorted(design.images, key=lambda x: x.sort_order):
if img.image_url:
image_urls.append(img.image_url)
if not image_urls and design.image_url:
image_urls.append(design.image_url)
if not image_urls:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="设计没有图片,无法生成视频")
logger.info(f"设计 {design_id} 生成视频,共收集到 {len(image_urls)} 张图片")
try:
video_url = await ai_video_generator.generate_video(image_urls)
design.video_url = video_url
db.commit()
db.refresh(design)
return design_to_response(design)
except Exception as e:
logger.error(f"视频生成失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"视频生成失败: {str(e)}"
)
@router.post("/{design_id}/generate-3d", response_model=DesignResponse)
async def generate_3d_model(
design_id: int,
force: bool = Query(False, description="强制重新生成"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
为设计生成 3D 模型
收集所有多视角图片取效果图45度视角生成 .glb 格式 3D 模型
"""
design = design_service.get_design_by_id(db=db, design_id=design_id, user_id=current_user.id)
if not design:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="设计不存在")
# 检查是否已有 3D 模型(第一张图片)
if design.images and not force:
first_img = sorted(design.images, key=lambda x: x.sort_order)[0]
if first_img.model_3d_url:
return design_to_response(design)
# 强制重生时删除旧文件
if force and design.images:
first_img = sorted(design.images, key=lambda x: x.sort_order)[0]
if first_img.model_3d_url:
_delete_local_file(first_img.model_3d_url)
first_img.model_3d_url = None
# 收集所有多视角图片 URL 和视角名称
image_urls = []
view_names = []
if design.images:
for img in sorted(design.images, key=lambda x: x.sort_order):
if img.image_url:
image_urls.append(img.image_url)
view_names.append(img.view_name or "效果图")
if not image_urls and design.image_url:
image_urls.append(design.image_url)
view_names.append("效果图")
if not image_urls:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="设计没有图片无法生成3D模型")
try:
model_url = await ai_3d_generator.generate_3d_model(image_urls, view_names)
# 将 3D 模型 URL 保存到第一张图片
if design.images:
first_img = sorted(design.images, key=lambda x: x.sort_order)[0]
first_img.model_3d_url = model_url
db.commit()
db.refresh(design)
return design_to_response(design)
except Exception as e:
logger.error(f"3D 模型生成失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"3D 模型生成失败: {str(e)}"
)

View File

@@ -16,6 +16,7 @@ class DesignImageResponse(BaseModel):
model_used: Optional[str] = None
prompt_used: Optional[str] = None
sort_order: int = 0
model_3d_url: Optional[str] = None
class Config:
from_attributes = True
@@ -51,6 +52,7 @@ class DesignResponse(BaseModel):
surface_finish: Optional[str] = None
usage_scene: Optional[str] = None
image_url: Optional[str] = None
video_url: Optional[str] = None
images: List[DesignImageResponse] = []
status: str
created_at: datetime

View File

@@ -0,0 +1,309 @@
"""
AI 3D 模型生成服务
使用腾讯混元3D (Hunyuan3D) API 将设计图片生成 3D 模型(.glb 格式)
"""
import asyncio
import hashlib
import hmac
import io
import json
import logging
import os
import time
import uuid
import zipfile
from datetime import datetime, timezone
from typing import Optional
import httpx
from .config_service import get_config_value
logger = logging.getLogger(__name__)
# 腾讯混元3D API 配置
HUNYUAN3D_HOST = "ai3d.tencentcloudapi.com"
HUNYUAN3D_SERVICE = "ai3d"
HUNYUAN3D_VERSION = "2025-05-13"
HUNYUAN3D_REGION = "ap-guangzhou"
SUBMIT_TIMEOUT = 30
POLL_TIMEOUT = 15
MAX_POLL_ATTEMPTS = 120 # 约 10 分钟
POLL_INTERVAL = 5
def _get_tencent_credentials() -> tuple:
"""获取腾讯云 SecretId 和 SecretKey"""
secret_id = get_config_value("TENCENT_SECRET_ID", "")
secret_key = get_config_value("TENCENT_SECRET_KEY", "")
if not secret_id or not secret_key:
raise RuntimeError("未配置 TENCENT_SECRET_ID 或 TENCENT_SECRET_KEY无法生成 3D 模型")
return secret_id, secret_key
def _sign_tc3(secret_key: str, date: str, service: str, string_to_sign: str) -> str:
"""TC3-HMAC-SHA256 签名"""
def _hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
secret_date = _hmac_sha256(("TC3" + secret_key).encode("utf-8"), date)
secret_service = _hmac_sha256(secret_date, service)
secret_signing = _hmac_sha256(secret_service, "tc3_request")
return hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
def _build_auth_headers(secret_id: str, secret_key: str, action: str, payload: str) -> dict:
"""构造腾讯云 API TC3-HMAC-SHA256 签名请求头"""
timestamp = int(time.time())
date = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
# 1. 拼接规范请求串
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{HUNYUAN3D_HOST}\nx-tc-action:{action.lower()}\n"
signed_headers = "content-type;host;x-tc-action"
hashed_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (
f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_payload}"
)
# 2. 拼接待签名字符串
algorithm = "TC3-HMAC-SHA256"
credential_scope = f"{date}/{HUNYUAN3D_SERVICE}/tc3_request"
hashed_canonical = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"
# 3. 计算签名
signature = _sign_tc3(secret_key, date, HUNYUAN3D_SERVICE, string_to_sign)
# 4. 拼接 Authorization
authorization = (
f"{algorithm} Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
return {
"Authorization": authorization,
"Content-Type": ct,
"Host": HUNYUAN3D_HOST,
"X-TC-Action": action,
"X-TC-Version": HUNYUAN3D_VERSION,
"X-TC-Region": HUNYUAN3D_REGION,
"X-TC-Timestamp": str(timestamp),
}
async def _call_hunyuan3d_api(action: str, params: dict) -> dict:
"""调用腾讯混元3D API 通用方法"""
secret_id, secret_key = _get_tencent_credentials()
payload = json.dumps(params)
headers = _build_auth_headers(secret_id, secret_key, action, payload)
url = f"https://{HUNYUAN3D_HOST}"
async with httpx.AsyncClient(timeout=SUBMIT_TIMEOUT) as client:
resp = await client.post(url, content=payload, headers=headers)
data = resp.json()
# 检查腾讯云 API 错误
response = data.get("Response", {})
error = response.get("Error")
if error:
code = error.get("Code", "Unknown")
message = error.get("Message", "未知错误")
raise RuntimeError(f"混元3D API 错误 [{code}]: {message}")
return response
# 视角名称 → 混元3D MultiViewImages 视角映射
# 默认3.0版本只支持 back, left, right
_VIEW_NAME_MAP = {
"侧面图": "left",
"背面图": "back",
}
async def generate_3d_model(image_urls: list, view_names: Optional[list] = None) -> str:
"""
调用腾讯混元3D 专业版 API 将图片生成 3D 模型
Args:
image_urls: 多视角图片 URL 列表
view_names: 对应的视角名称列表(效果图/正面图/侧面图/背面图)
Returns:
.glb 3D 模型文件的远程 URL
Raises:
RuntimeError: 3D 模型生成失败
"""
if not image_urls:
raise RuntimeError("没有可用的图片,无法生成 3D 模型")
if not view_names:
view_names = ["效果图"] + ["未知"] * (len(image_urls) - 1)
# 选择主图(正面图优先,其次效果图,否则第一张)
main_url = None
multi_views = []
for url, name in zip(image_urls, view_names):
if name == "正面图" and not main_url:
main_url = url
elif name in _VIEW_NAME_MAP:
multi_views.append({"ViewType": _VIEW_NAME_MAP[name], "ViewImageUrl": url})
elif not main_url:
main_url = url
# 其他未知视角跳过
# 如果没有找到主图,用第一张
if not main_url:
main_url = image_urls[0]
logger.info(f"3D 模型生成: 主图=1张, 多视角={len(multi_views)}")
# Step 1: 提交图生3D任务统一使用专业版
job_id = await _submit_3d_task(main_url, multi_views)
logger.info(f"3D 模型生成任务已提交(专业版): job_id={job_id}")
# Step 2: 轮询等待结果
remote_url = await _poll_3d_result(job_id)
logger.info(f"3D 模型生成完成: {remote_url[:80]}...")
# Step 3: 下载模型文件到本地(解决 CORS 跨域问题)
model_url = await _download_model_to_local(remote_url)
logger.info(f"模型文件已保存到本地: {model_url}")
return model_url
async def _submit_3d_task(image_url: str, multi_views: Optional[list] = None) -> str:
"""
提交图生3D任务到腾讯混元3D 专业版
Returns:
job_id
"""
params = {
"ImageUrl": image_url,
}
# 有多视角图片则附带
if multi_views:
params["MultiViewImages"] = multi_views
response = await _call_hunyuan3d_api("SubmitHunyuanTo3DProJob", params)
job_id = response.get("JobId")
if not job_id:
raise RuntimeError(f"混元3D 响应中未找到 JobId: {response}")
return job_id
async def _poll_3d_result(job_id: str) -> str:
"""
轮询 3D 模型生成结果(专业版)
Returns:
远程模型文件 URL
"""
query_action = "QueryHunyuanTo3DProJob"
for attempt in range(1, MAX_POLL_ATTEMPTS + 1):
await asyncio.sleep(POLL_INTERVAL)
try:
response = await _call_hunyuan3d_api(query_action, {"JobId": job_id})
except RuntimeError as e:
logger.warning(f"轮询 3D 结果失败 (attempt={attempt}): {e}")
continue
status = response.get("Status", "")
if status == "DONE":
result_files = response.get("ResultFile3Ds", [])
logger.info(f"3D 结果文件列表: {result_files}")
# 优先找 ZIP 格式包含所有格式GLB/OBJ/FBX/STL等
for f in result_files:
if f.get("Type", "").upper() == "ZIP":
return f.get("Url", "")
# 没有 ZIP找 GLB
for f in result_files:
if f.get("Type", "").upper() == "GLB":
return f.get("Url", "")
# 都没有,取第一个有 URL 的文件
for f in result_files:
file_url = f.get("Url", "")
if file_url:
return file_url
raise RuntimeError(f"3D 模型生成成功但未找到模型 URL: {response}")
elif status == "FAIL":
error_code = response.get("ErrorCode", "")
error_msg = response.get("ErrorMessage", "未知错误")
raise RuntimeError(f"3D 模型生成失败 [{error_code}]: {error_msg}")
else:
if attempt % 6 == 0:
logger.info(f"3D 模型生成中... (attempt={attempt}, status={status})")
raise RuntimeError(f"3D 模型生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成")
async def _download_model_to_local(remote_url: str) -> str:
"""
下载远程模型文件到本地 uploads/models/ 目录
支持 .zip解压提取 .glb 并同时保留 zip和直接的 .glb 文件
Returns:
本地路径(/uploads/models/xxx.glb
"""
# 确保目录存在
models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads", "models")
os.makedirs(models_dir, exist_ok=True)
# 下载远程文件
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(remote_url)
if resp.status_code != 200:
raise RuntimeError(f"下载 3D 模型文件失败: HTTP {resp.status_code}")
file_data = resp.content
url_path = remote_url.split('?')[0].lower()
file_id = uuid.uuid4().hex
# 判断是否为 zip 文件(检查 URL 后缀 或 文件头 PK
is_zip = url_path.endswith('.zip') or file_data[:2] == b'PK'
if is_zip:
# 保存原始 zip 包(供用户下载完整模型)
zip_path = os.path.join(models_dir, f"{file_id}.zip")
with open(zip_path, "wb") as f:
f.write(file_data)
logger.info(f"ZIP 包已保存: {zip_path} ({len(file_data)} bytes)")
# 解压提取 glb
glb_content = None
with zipfile.ZipFile(io.BytesIO(file_data)) as zf:
logger.info(f"ZIP 文件内容: {zf.namelist()}")
for name in zf.namelist():
if name.lower().endswith('.glb'):
glb_content = zf.read(name)
logger.info(f"从 zip 中提取 GLB: {name} ({len(glb_content)} bytes)")
break
if glb_content is None:
raise RuntimeError("zip 中未找到 .glb 文件")
file_data = glb_content
# 保存 glb 文件
glb_path = os.path.join(models_dir, f"{file_id}.glb")
with open(glb_path, "wb") as f:
f.write(file_data)
logger.info(f"GLB 文件已保存: {glb_path} ({len(file_data)} bytes)")
return f"/uploads/models/{file_id}.glb"

View File

@@ -0,0 +1,392 @@
"""
AI 视频生成服务
使用火山引擎 即梦3.0 Pro (Jimeng Video 3.0 Pro) 将设计图生成 360 度旋转展示视频
API 文档: https://www.volcengine.com/docs/85621/1777001
认证方式: Volcengine V4 签名 (Access Key + Secret Key)
API 端点: https://visual.volcengineapi.com
"""
import asyncio
import base64
import hashlib
import hmac
import io
import json
import logging
import math
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List
from urllib.parse import quote
import httpx
from PIL import Image
from .config_service import get_config_value
# 视频本地存储目录
VIDEO_UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" / "videos"
VIDEO_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(__name__)
# 火山引擎视觉 API 配置
VISUAL_API_HOST = "visual.volcengineapi.com"
VISUAL_API_URL = f"https://{VISUAL_API_HOST}"
REGION = "cn-north-1"
SERVICE = "cv"
API_VERSION = "2022-08-31"
# 即梦3.0 Pro req_key
REQ_KEY_I2V = "jimeng_ti2v_v30_pro" # 支持传 image_urls 做图生视频
# 超时与轮询配置
SUBMIT_TIMEOUT = 30
POLL_TIMEOUT = 15
MAX_POLL_ATTEMPTS = 120 # 约 10 分钟
POLL_INTERVAL = 5
# ============================================================
# Volcengine V4 签名实现
# ============================================================
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(secret_key: str, date_stamp: str, region: str, service: str) -> bytes:
k_date = _sign(secret_key.encode("utf-8"), date_stamp)
k_region = _sign(k_date, region)
k_service = _sign(k_region, service)
k_signing = _sign(k_service, "request")
return k_signing
def _build_signed_headers(
access_key: str,
secret_key: str,
action: str,
body: str,
) -> dict:
"""
构建带 V4 签名的请求头
参考: https://www.volcengine.com/docs/6369/67269
"""
now = datetime.now(timezone.utc)
date_stamp = now.strftime("%Y%m%d")
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
# 请求参数
canonical_querystring = f"Action={quote(action, safe='')}&Version={quote(API_VERSION, safe='')}"
# 规范请求头
content_type = "application/json"
canonical_headers = (
f"content-type:{content_type}\n"
f"host:{VISUAL_API_HOST}\n"
f"x-date:{amz_date}\n"
)
signed_headers = "content-type;host;x-date"
# Payload hash
payload_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
# 规范请求
canonical_request = (
f"POST\n"
f"/\n"
f"{canonical_querystring}\n"
f"{canonical_headers}\n"
f"{signed_headers}\n"
f"{payload_hash}"
)
# 待签字符串
credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request"
string_to_sign = (
f"HMAC-SHA256\n"
f"{amz_date}\n"
f"{credential_scope}\n"
f"{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
)
# 签名
signing_key = _get_signature_key(secret_key, date_stamp, REGION, SERVICE)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
# Authorization 头
authorization = (
f"HMAC-SHA256 "
f"Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
return {
"Content-Type": content_type,
"Host": VISUAL_API_HOST,
"X-Date": amz_date,
"Authorization": authorization,
}
# ============================================================
# 视频生成核心逻辑
# ============================================================
def _get_volc_keys() -> tuple:
"""获取火山引擎 Access Key 和 Secret Key"""
access_key = get_config_value("VOLC_ACCESS_KEY", "")
secret_key = get_config_value("VOLC_SECRET_KEY", "")
if not access_key or not secret_key:
raise RuntimeError(
"未配置 VOLC_ACCESS_KEY 或 VOLC_SECRET_KEY无法使用即梦视频生成。"
"请在管理后台 系统配置 中添加火山引擎 Access Key 和 Secret Key。"
)
return access_key, secret_key
async def _merge_images_to_base64(image_urls: List[str]) -> str:
"""
下载多张图片并拼接为一张网格图,返回 base64 编码
拼接策略:
- 1张: 直接使用
- 2张: 1×2 横向拼接
- 3~4张: 2×2 网格拼接
"""
# 下载所有图片
images = []
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
for url in image_urls:
try:
resp = await client.get(url)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content)).convert("RGB")
images.append(img)
logger.info(f"下载图片成功: {url[:60]}... 尺寸={img.size}")
except Exception as e:
logger.warning(f"下载图片失败: {url[:60]}... {e}")
if not images:
raise RuntimeError("没有成功下载任何图片")
# 只有1张时直接使用
if len(images) == 1:
merged = images[0]
else:
# 统一尺寸到最大尺寸
max_w = max(img.width for img in images)
max_h = max(img.height for img in images)
# 计算网格布局
n = len(images)
if n == 2:
cols, rows = 2, 1
else:
cols = 2
rows = math.ceil(n / cols)
# 创建拼接画布(白色背景)
canvas_w = cols * max_w
canvas_h = rows * max_h
merged = Image.new("RGB", (canvas_w, canvas_h), (255, 255, 255))
for idx, img in enumerate(images):
r = idx // cols
c = idx % cols
# 居中放置
x = c * max_w + (max_w - img.width) // 2
y = r * max_h + (max_h - img.height) // 2
merged.paste(img, (x, y))
logger.info(f"图片拼接完成: {n}张 -> {cols}x{rows}网格, 尺寸={merged.size}")
# 转 base64
buf = io.BytesIO()
merged.save(buf, format="JPEG", quality=90)
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
logger.info(f"拼接图 base64 长度: {len(b64)}")
return b64
async def generate_video(
image_urls: List[str],
prompt: str = "",
duration_seconds: int = 5,
) -> str:
"""
调用即梦3.0 Pro 生成 360 度旋转展示视频
Args:
image_urls: 多视角图片 URL 列表(取第一张作为首帧)
prompt: 视频生成提示词
duration_seconds: 预留参数(即梦目前固定帧数)
Returns:
生成的视频远程 URL
Raises:
RuntimeError: 视频生成失败
"""
access_key, secret_key = _get_volc_keys()
logger.info(f"传入视频生成的图片数量: {len(image_urls)}")
# 即梦API只支持单张图片输入取第一张正面效果图作为基准
first_url = image_urls[0]
logger.info(f"使用第一张图片生成视频: {first_url[:80]}...")
# 从配置读取默认 prompt
if not prompt:
prompt = get_config_value("VIDEO_PROMPT", "")
if not prompt:
prompt = (
"玉雕作品在摄影棚内缓慢旋转360度展示全貌"
"专业珠宝摄影灯光,纯白色背景,平稳旋转,"
"展示正面、侧面、背面各个角度,电影级画质"
)
# Step 1: 提交任务只传第一张图片URL
task_id = await _submit_video_task(access_key, secret_key, first_url, prompt)
logger.info(f"即梦视频生成任务已提交: task_id={task_id}")
# Step 2: 轮询等待结果
remote_video_url = await _poll_video_result(access_key, secret_key, task_id)
logger.info(f"即梦视频生成完成: {remote_video_url[:80]}...")
# Step 3: 下载视频到本地存储即梦URL有效期约 1 小时,必须保存到本地)
local_path = await _download_video_to_local(remote_video_url)
logger.info(f"视频已保存到本地: {local_path}")
return local_path
async def _submit_video_task(
access_key: str,
secret_key: str,
image_url: str,
prompt: str,
) -> str:
"""提交图生视频任务到即梦3.0 Pro使用单张图片URL"""
action = "CVSync2AsyncSubmitTask"
logger.info(f"提交即梦视频任务图片URL: {image_url[:80]}...")
payload = {
"req_key": REQ_KEY_I2V,
"prompt": prompt,
"image_urls": [image_url],
"seed": -1,
"frames": int(get_config_value("VIDEO_FRAMES", "121")),
"aspect_ratio": "1:1", # 玉雕展示用正方形
}
body = json.dumps(payload, ensure_ascii=False)
headers = _build_signed_headers(access_key, secret_key, action, body)
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
async with httpx.AsyncClient(timeout=SUBMIT_TIMEOUT) as client:
resp = await client.post(url, content=body, headers=headers)
if resp.status_code != 200:
logger.error(f"即梦视频任务提交失败: status={resp.status_code}, body={resp.text[:500]}")
resp.raise_for_status()
data = resp.json()
# 检查响应
code = data.get("code", 0)
if code != 10000:
msg = data.get("message", "未知错误")
raise RuntimeError(f"即梦视频任务提交失败 (code={code}): {msg}")
task_id = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"即梦响应中未找到 task_id: {data}")
return task_id
async def _poll_video_result(
access_key: str,
secret_key: str,
task_id: str,
) -> str:
"""轮询视频生成结果"""
action = "CVSync2AsyncGetResult"
payload = {
"req_key": REQ_KEY_I2V,
"task_id": task_id,
}
body = json.dumps(payload, ensure_ascii=False)
for attempt in range(1, MAX_POLL_ATTEMPTS + 1):
await asyncio.sleep(POLL_INTERVAL)
# 每次轮询需要重新签名(时间戳不同)
headers = _build_signed_headers(access_key, secret_key, action, body)
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
try:
async with httpx.AsyncClient(timeout=POLL_TIMEOUT) as client:
resp = await client.post(url, content=body, headers=headers)
if resp.status_code != 200:
logger.warning(f"轮询即梦视频结果失败 (attempt={attempt}): status={resp.status_code}, body={resp.text[:300]}")
continue
data = resp.json()
except Exception as e:
logger.warning(f"轮询即梦视频异常 (attempt={attempt}): {e}")
continue
code = data.get("code", 0)
task_data = data.get("data", {})
status = task_data.get("status", "")
if status == "done" and code == 10000:
video_url = task_data.get("video_url", "")
if video_url:
return video_url
raise RuntimeError(f"即梦视频生成完成但未找到 video_url: {data}")
elif status == "done" and code != 10000:
msg = data.get("message", "未知错误")
raise RuntimeError(f"即梦视频生成失败 (code={code}): {msg}")
elif status in ("not_found", "expired"):
raise RuntimeError(f"即梦视频任务状态异常: {status}")
else:
# in_queue / generating
if attempt % 6 == 0:
logger.info(f"即梦视频生成中... (attempt={attempt}, status={status})")
raise RuntimeError(f"即梦视频生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成")
async def _download_video_to_local(remote_url: str) -> str:
"""
下载远程视频到本地 uploads/videos/ 目录
Returns:
本地视频的 URL 路径,如 /uploads/videos/xxx.mp4
"""
filename = f"{uuid.uuid4().hex}.mp4"
local_file = VIDEO_UPLOAD_DIR / filename
try:
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(remote_url)
resp.raise_for_status()
local_file.write_bytes(resp.content)
logger.info(f"视频下载完成: {len(resp.content)} 字节 -> {local_file}")
except Exception as e:
logger.error(f"视频下载失败: {e}")
raise RuntimeError(f"视频下载失败: {e}")
# 返回相对 URL 路径(和图片一样通过 /uploads/ 静态服务访问)
return f"/uploads/videos/{filename}"