Files
YuShiSheJiShi/backend/app/services/ai_3d_generator.py
2ef126e445 feat: 强化多视角图片一致性 + 修复下载逻辑 + 技术文档
- 新增品类专属背面/侧面描述(BACK_VIEW_HINTS/SIDE_VIEW_HINTS)
- 强化一致性前缀策略,按视角定制相机位置描述
- 更新视角映射提示词为纯摄影术语
- 修复前端下载逻辑:改用fetch直接下载当前视角图片
- HTTPS改HTTP修复外网URL访问
- 新增多视角一致性与3D视频生成技术文档
2026-03-28 19:51:08 +08:00

325 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AI 3D 模型生成服务
使用腾讯混元3D (Hunyuan3D) API 将设计图片生成 3D 模型(.glb 格式)
"""
import asyncio
import hashlib
import hmac
import io
import json
import logging
import os
import time
import uuid
import zipfile
from datetime import datetime, timezone
from typing import Optional
import httpx
from .config_service import get_config_value
logger = logging.getLogger(__name__)
# 腾讯混元3D API 配置
HUNYUAN3D_HOST = "ai3d.tencentcloudapi.com"
HUNYUAN3D_SERVICE = "ai3d"
HUNYUAN3D_VERSION = "2025-05-13"
HUNYUAN3D_REGION = "ap-guangzhou"
SUBMIT_TIMEOUT = 30
POLL_TIMEOUT = 15
MAX_POLL_ATTEMPTS = 120 # 约 10 分钟
POLL_INTERVAL = 5
def _get_tencent_credentials() -> tuple:
"""获取腾讯云 SecretId 和 SecretKey"""
secret_id = get_config_value("TENCENT_SECRET_ID", "")
secret_key = get_config_value("TENCENT_SECRET_KEY", "")
if not secret_id or not secret_key:
raise RuntimeError("未配置 TENCENT_SECRET_ID 或 TENCENT_SECRET_KEY无法生成 3D 模型")
return secret_id, secret_key
def _sign_tc3(secret_key: str, date: str, service: str, string_to_sign: str) -> str:
"""TC3-HMAC-SHA256 签名"""
def _hmac_sha256(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
secret_date = _hmac_sha256(("TC3" + secret_key).encode("utf-8"), date)
secret_service = _hmac_sha256(secret_date, service)
secret_signing = _hmac_sha256(secret_service, "tc3_request")
return hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
def _build_auth_headers(secret_id: str, secret_key: str, action: str, payload: str) -> dict:
"""构造腾讯云 API TC3-HMAC-SHA256 签名请求头"""
timestamp = int(time.time())
date = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
# 1. 拼接规范请求串
http_request_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{HUNYUAN3D_HOST}\nx-tc-action:{action.lower()}\n"
signed_headers = "content-type;host;x-tc-action"
hashed_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (
f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_payload}"
)
# 2. 拼接待签名字符串
algorithm = "TC3-HMAC-SHA256"
credential_scope = f"{date}/{HUNYUAN3D_SERVICE}/tc3_request"
hashed_canonical = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"
# 3. 计算签名
signature = _sign_tc3(secret_key, date, HUNYUAN3D_SERVICE, string_to_sign)
# 4. 拼接 Authorization
authorization = (
f"{algorithm} Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)
return {
"Authorization": authorization,
"Content-Type": ct,
"Host": HUNYUAN3D_HOST,
"X-TC-Action": action,
"X-TC-Version": HUNYUAN3D_VERSION,
"X-TC-Region": HUNYUAN3D_REGION,
"X-TC-Timestamp": str(timestamp),
}
async def _call_hunyuan3d_api(action: str, params: dict) -> dict:
"""调用腾讯混元3D API 通用方法"""
secret_id, secret_key = _get_tencent_credentials()
payload = json.dumps(params)
headers = _build_auth_headers(secret_id, secret_key, action, payload)
url = f"https://{HUNYUAN3D_HOST}"
async with httpx.AsyncClient(timeout=SUBMIT_TIMEOUT) as client:
resp = await client.post(url, content=payload, headers=headers)
data = resp.json()
# 检查腾讯云 API 错误
response = data.get("Response", {})
error = response.get("Error")
if error:
code = error.get("Code", "Unknown")
message = error.get("Message", "未知错误")
raise RuntimeError(f"混元3D API 错误 [{code}]: {message}")
return response
# 视角名称 → 混元3D MultiViewImages 视角映射
# 默认3.0版本只支持 back, left, right
_VIEW_NAME_MAP = {
"侧面图": "left",
"背面图": "back",
}
def _to_public_url(url: str) -> str:
"""将本地路径转换为外网可访问的完整 URL
第三方API如混元3D、可灵AI需要外网可访问的图片URL
本地存储路径(/uploads/xxx需要拼接域名。
"""
if url and url.startswith("/uploads/"):
base_domain = get_config_value("SITE_DOMAIN", "http://c02.wsg.plus")
return f"{base_domain}{url}"
return url
async def generate_3d_model(image_urls: list, view_names: Optional[list] = None) -> str:
"""
调用腾讯混元3D 专业版 API 将图片生成 3D 模型
Args:
image_urls: 多视角图片 URL 列表
view_names: 对应的视角名称列表(效果图/正面图/侧面图/背面图)
Returns:
.glb 3D 模型文件的远程 URL
Raises:
RuntimeError: 3D 模型生成失败
"""
if not image_urls:
raise RuntimeError("没有可用的图片,无法生成 3D 模型")
if not view_names:
view_names = ["效果图"] + ["未知"] * (len(image_urls) - 1)
# 将本地路径转换为外网可访问URL第三方API需要完整URL
image_urls = [_to_public_url(u) for u in image_urls]
# 选择主图(正面图优先,其次效果图,否则第一张)
main_url = None
multi_views = []
for url, name in zip(image_urls, view_names):
if name == "正面图" and not main_url:
main_url = url
elif name in _VIEW_NAME_MAP:
multi_views.append({"ViewType": _VIEW_NAME_MAP[name], "ViewImageUrl": url})
elif not main_url:
main_url = url
# 其他未知视角跳过
# 如果没有找到主图,用第一张
if not main_url:
main_url = image_urls[0]
logger.info(f"3D 模型生成: 主图=1张, 多视角={len(multi_views)}")
# Step 1: 提交图生3D任务统一使用专业版
job_id = await _submit_3d_task(main_url, multi_views)
logger.info(f"3D 模型生成任务已提交(专业版): job_id={job_id}")
# Step 2: 轮询等待结果
remote_url = await _poll_3d_result(job_id)
logger.info(f"3D 模型生成完成: {remote_url[:80]}...")
# Step 3: 下载模型文件到本地(解决 CORS 跨域问题)
model_url = await _download_model_to_local(remote_url)
logger.info(f"模型文件已保存到本地: {model_url}")
return model_url
async def _submit_3d_task(image_url: str, multi_views: Optional[list] = None) -> str:
"""
提交图生3D任务到腾讯混元3D 专业版
Returns:
job_id
"""
params = {
"ImageUrl": image_url,
}
# 有多视角图片则附带
if multi_views:
params["MultiViewImages"] = multi_views
response = await _call_hunyuan3d_api("SubmitHunyuanTo3DProJob", params)
job_id = response.get("JobId")
if not job_id:
raise RuntimeError(f"混元3D 响应中未找到 JobId: {response}")
return job_id
async def _poll_3d_result(job_id: str) -> str:
"""
轮询 3D 模型生成结果(专业版)
Returns:
远程模型文件 URL
"""
query_action = "QueryHunyuanTo3DProJob"
for attempt in range(1, MAX_POLL_ATTEMPTS + 1):
await asyncio.sleep(POLL_INTERVAL)
try:
response = await _call_hunyuan3d_api(query_action, {"JobId": job_id})
except RuntimeError as e:
logger.warning(f"轮询 3D 结果失败 (attempt={attempt}): {e}")
continue
status = response.get("Status", "")
if status == "DONE":
result_files = response.get("ResultFile3Ds", [])
logger.info(f"3D 结果文件列表: {result_files}")
# 优先找 ZIP 格式包含所有格式GLB/OBJ/FBX/STL等
for f in result_files:
if f.get("Type", "").upper() == "ZIP":
return f.get("Url", "")
# 没有 ZIP找 GLB
for f in result_files:
if f.get("Type", "").upper() == "GLB":
return f.get("Url", "")
# 都没有,取第一个有 URL 的文件
for f in result_files:
file_url = f.get("Url", "")
if file_url:
return file_url
raise RuntimeError(f"3D 模型生成成功但未找到模型 URL: {response}")
elif status == "FAIL":
error_code = response.get("ErrorCode", "")
error_msg = response.get("ErrorMessage", "未知错误")
raise RuntimeError(f"3D 模型生成失败 [{error_code}]: {error_msg}")
else:
if attempt % 6 == 0:
logger.info(f"3D 模型生成中... (attempt={attempt}, status={status})")
raise RuntimeError(f"3D 模型生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成")
async def _download_model_to_local(remote_url: str) -> str:
"""
下载远程模型文件到本地 uploads/models/ 目录
支持 .zip解压提取 .glb 并同时保留 zip和直接的 .glb 文件
Returns:
本地路径(/uploads/models/xxx.glb
"""
# 确保目录存在
models_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads", "models")
os.makedirs(models_dir, exist_ok=True)
# 下载远程文件
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(remote_url)
if resp.status_code != 200:
raise RuntimeError(f"下载 3D 模型文件失败: HTTP {resp.status_code}")
file_data = resp.content
url_path = remote_url.split('?')[0].lower()
file_id = uuid.uuid4().hex
# 判断是否为 zip 文件(检查 URL 后缀 或 文件头 PK
is_zip = url_path.endswith('.zip') or file_data[:2] == b'PK'
if is_zip:
# 保存原始 zip 包(供用户下载完整模型)
zip_path = os.path.join(models_dir, f"{file_id}.zip")
with open(zip_path, "wb") as f:
f.write(file_data)
logger.info(f"ZIP 包已保存: {zip_path} ({len(file_data)} bytes)")
# 解压提取 glb
glb_content = None
with zipfile.ZipFile(io.BytesIO(file_data)) as zf:
logger.info(f"ZIP 文件内容: {zf.namelist()}")
for name in zf.namelist():
if name.lower().endswith('.glb'):
glb_content = zf.read(name)
logger.info(f"从 zip 中提取 GLB: {name} ({len(glb_content)} bytes)")
break
if glb_content is None:
raise RuntimeError("zip 中未找到 .glb 文件")
file_data = glb_content
# 保存 glb 文件
glb_path = os.path.join(models_dir, f"{file_id}.glb")
with open(glb_path, "wb") as f:
f.write(file_data)
logger.info(f"GLB 文件已保存: {glb_path} ({len(file_data)} bytes)")
return f"/uploads/models/{file_id}.glb"