Files
YuShiSheJiShi/backend/app/services/ai_video_generator.py
1d94ec114a feat(video): 集成可灵AI多图参考生视频生成服务
- 替换视频生成服务为可灵AI多图参考生视频API,支持1-4张多视角图片输入
- 调整图片拼接逻辑,生成横向长图传入即梦API备用
- 实现基于JWT认证的可灵API请求和轮询机制,支持高品质1:1正方形视频生成
- 在设计详情页新增视频展示区域及生成、重新生成和下载视频操作
- 更新后台系统配置,支持配置可灵AI Access Key和Secret Key
- 删除即梦视频相关配置及逻辑,所有视频生成功能切换到可灵AI实现
- 优化视频生成提示词,提升视频质感和展示效果
- 增加视频文件本地存储和路径管理,保证视频可访问和下载
- 前端增加视频生成状态管理和用户界面交互提示
- 后端添加PyJWT依赖,支持JWT认证流程
2026-03-28 00:20:48 +08:00

423 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AI 视频生成服务
使用火山引擎 即梦3.0 Pro (Jimeng Video 3.0 Pro) 将设计图生成 360 度旋转展示视频
API 文档: https://www.volcengine.com/docs/85621/1777001
认证方式: Volcengine V4 签名 (Access Key + Secret Key)
API 端点: https://visual.volcengineapi.com
"""
import asyncio
import base64
import hashlib
import hmac
import io
import json
import logging
import math
import os
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List
from urllib.parse import quote
import httpx
from PIL import Image
from .config_service import get_config_value
# 视频本地存储目录
VIDEO_UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" / "videos"
VIDEO_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(__name__)
# 火山引擎视觉 API 配置
VISUAL_API_HOST = "visual.volcengineapi.com"
VISUAL_API_URL = f"https://{VISUAL_API_HOST}"
REGION = "cn-north-1"
SERVICE = "cv"
API_VERSION = "2022-08-31"
# 即梦3.0 Pro req_key
REQ_KEY_I2V = "jimeng_ti2v_v30_pro" # 支持传 image_urls 做图生视频
# 超时与轮询配置
SUBMIT_TIMEOUT = 30
POLL_TIMEOUT = 15
MAX_POLL_ATTEMPTS = 120 # 约 10 分钟
POLL_INTERVAL = 5
# ============================================================
# Volcengine V4 签名实现
# ============================================================
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(secret_key: str, date_stamp: str, region: str, service: str) -> bytes:
k_date = _sign(secret_key.encode("utf-8"), date_stamp)
k_region = _sign(k_date, region)
k_service = _sign(k_region, service)
k_signing = _sign(k_service, "request")
return k_signing
def _build_signed_headers(
access_key: str,
secret_key: str,
action: str,
body: str,
) -> dict:
"""
构建带 V4 签名的请求头
参考: https://www.volcengine.com/docs/6369/67269
"""
now = datetime.now(timezone.utc)
date_stamp = now.strftime("%Y%m%d")
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
# 请求参数
canonical_querystring = f"Action={quote(action, safe='')}&Version={quote(API_VERSION, safe='')}"
# 规范请求头
content_type = "application/json"
canonical_headers = (
f"content-type:{content_type}\n"
f"host:{VISUAL_API_HOST}\n"
f"x-date:{amz_date}\n"
)
signed_headers = "content-type;host;x-date"
# Payload hash
payload_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
# 规范请求
canonical_request = (
f"POST\n"
f"/\n"
f"{canonical_querystring}\n"
f"{canonical_headers}\n"
f"{signed_headers}\n"
f"{payload_hash}"
)
# 待签字符串
credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request"
string_to_sign = (
f"HMAC-SHA256\n"
f"{amz_date}\n"
f"{credential_scope}\n"
f"{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
)
# 签名
signing_key = _get_signature_key(secret_key, date_stamp, REGION, SERVICE)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
# Authorization 头
authorization = (
f"HMAC-SHA256 "
f"Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, "
f"Signature={signature}"
)
return {
"Content-Type": content_type,
"Host": VISUAL_API_HOST,
"X-Date": amz_date,
"Authorization": authorization,
}
# ============================================================
# 视频生成核心逻辑
# ============================================================
def _get_volc_keys() -> tuple:
"""获取火山引擎 Access Key 和 Secret Key"""
access_key = get_config_value("VOLC_ACCESS_KEY", "")
secret_key = get_config_value("VOLC_SECRET_KEY", "")
if not access_key or not secret_key:
raise RuntimeError(
"未配置 VOLC_ACCESS_KEY 或 VOLC_SECRET_KEY无法使用即梦视频生成。"
"请在管理后台 系统配置 中添加火山引擎 Access Key 和 Secret Key。"
)
return access_key, secret_key
async def _merge_images_to_base64(image_urls: List[str]) -> str:
"""
下载多张图片并横向拼接为一张长图,返回 base64 编码
拼接策略:
- 1张: 直接使用
- 多张: 横向一字排开拼接(展示各角度全貌)
"""
# 下载所有图片
images = []
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
for url in image_urls:
try:
resp = await client.get(url)
resp.raise_for_status()
img = Image.open(io.BytesIO(resp.content)).convert("RGB")
images.append(img)
logger.info(f"下载图片成功: {url[:60]}... 尺寸={img.size}")
except Exception as e:
logger.warning(f"下载图片失败: {url[:60]}... {e}")
if not images:
raise RuntimeError("没有成功下载任何图片")
# 只有1张时直接使用
if len(images) == 1:
merged = images[0]
else:
# 统一高度,横向拼接
target_h = max(img.height for img in images)
resized = []
for img in images:
if img.height != target_h:
ratio = target_h / img.height
new_w = int(img.width * ratio)
img = img.resize((new_w, target_h), Image.LANCZOS)
resized.append(img)
total_w = sum(img.width for img in resized)
merged = Image.new("RGB", (total_w, target_h), (255, 255, 255))
x_offset = 0
for img in resized:
merged.paste(img, (x_offset, 0))
x_offset += img.width
logger.info(f"图片横向拼接完成: {len(resized)}张 -> 尺寸={merged.size}")
# 压缩图片尺寸即梦API对请求体大小有限制
max_width = 1920
if merged.width > max_width:
ratio = max_width / merged.width
new_h = int(merged.height * ratio)
merged = merged.resize((max_width, new_h), Image.LANCZOS)
logger.info(f"拼接图已压缩至: {merged.size}")
# 转 base64控制质量确保 base64 不会过大
buf = io.BytesIO()
quality = 75
merged.save(buf, format="JPEG", quality=quality)
# 如果超过 5MB进一步压缩
while buf.tell() > 5 * 1024 * 1024 and quality > 30:
buf = io.BytesIO()
quality -= 10
merged.save(buf, format="JPEG", quality=quality)
logger.info(f"图片超过 5MB降低质量到 {quality},大小={buf.tell()} bytes")
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
logger.info(f"拼接图 base64 长度: {len(b64)}")
return b64
async def generate_video(
image_urls: List[str],
prompt: str = "",
duration_seconds: int = 5,
) -> str:
"""
调用即梦3.0 Pro 生成 360 度旋转展示视频
流程:
1. 将多视角图片横向拼接成一张长图
2. 以 base64 方式传入即梦API
3. 使用强化提示词描述单品展示
Args:
image_urls: 多视角图片 URL 列表
prompt: 视频生成提示词
duration_seconds: 预留参数
Returns:
生成的视频本地 URL
"""
access_key, secret_key = _get_volc_keys()
logger.info(f"传入视频生成的图片数量: {len(image_urls)}")
# Step 0: 拼接多视角图片为横向长图,转 base64
merged_b64 = await _merge_images_to_base64(image_urls)
logger.info(f"多视角图片已拼接为长图base64长度: {len(merged_b64)}")
# 从配置读取默认 prompt如果没有则使用强化提示词
if not prompt:
prompt = get_config_value("VIDEO_PROMPT", "")
if not prompt:
prompt = (
"参考图展示的是同一件精美玉雕工艺品的多个角度,"
"请生成这一件玉雕作品在专业珠宝摄影棚内的展示视频。"
"纯白色背景,柔和的珠宝摄影灯光,"
"这一件玉石作品放在旋转展台上缓慢平稳地旋转360度"
"展现其温润的质感、细腻的雕刻纹理和通透的光泽,"
"电影级画质,微距的细节感,平稳流畅的转台旋转,"
"画面中只有一件玉雕作品"
)
# Step 1: 提交任务(用 base64 拼接长图,失败则降级为第一张图片 URL
try:
task_id = await _submit_video_task(access_key, secret_key, None, prompt, merged_b64)
logger.info(f"即梦视频生成任务已提交(base64拼接图): task_id={task_id}")
except Exception as e:
logger.warning(f"base64 拼接图提交失败降级为第一张图片URL: {e}")
first_url = image_urls[0]
task_id = await _submit_video_task(access_key, secret_key, first_url, prompt)
logger.info(f"即梦视频生成任务已提交(单图URL): task_id={task_id}")
# Step 2: 轮询等待结果
remote_video_url = await _poll_video_result(access_key, secret_key, task_id)
logger.info(f"即梦视频生成完成: {remote_video_url[:80]}...")
# Step 3: 下载视频到本地存储
local_path = await _download_video_to_local(remote_video_url)
logger.info(f"视频已保存到本地: {local_path}")
return local_path
async def _submit_video_task(
access_key: str,
secret_key: str,
image_url: Optional[str],
prompt: str,
image_base64: Optional[str] = None,
) -> str:
"""提交图生视频任务到即梦3.0 Pro支持 URL 或 base64 输入"""
action = "CVSync2AsyncSubmitTask"
payload = {
"req_key": REQ_KEY_I2V,
"prompt": prompt,
"seed": -1,
"frames": int(get_config_value("VIDEO_FRAMES", "121")),
"aspect_ratio": "1:1",
}
# 优先使用 base64拼接长图否则用 URL
if image_base64:
payload["binary_data_base64"] = [image_base64]
logger.info(f"使用 base64 拼接长图提交即梦视频任务base64长度={len(image_base64)}")
elif image_url:
payload["image_urls"] = [image_url]
logger.info(f"使用图片URL提交即梦视频任务: {image_url[:80]}...")
else:
raise RuntimeError("未提供图片输入")
body = json.dumps(payload, ensure_ascii=False)
headers = _build_signed_headers(access_key, secret_key, action, body)
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
# base64 数据量大,需要更长的超时时间
timeout = 120 if image_base64 else SUBMIT_TIMEOUT
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, content=body, headers=headers)
if resp.status_code != 200:
error_body = resp.text[:1000]
logger.error(f"即梦视频任务提交失败: status={resp.status_code}, body={error_body}")
resp.raise_for_status()
data = resp.json()
# 检查响应
code = data.get("code", 0)
if code != 10000:
msg = data.get("message", "未知错误")
raise RuntimeError(f"即梦视频任务提交失败 (code={code}): {msg}")
task_id = data.get("data", {}).get("task_id")
if not task_id:
raise RuntimeError(f"即梦响应中未找到 task_id: {data}")
return task_id
async def _poll_video_result(
access_key: str,
secret_key: str,
task_id: str,
) -> str:
"""轮询视频生成结果"""
action = "CVSync2AsyncGetResult"
payload = {
"req_key": REQ_KEY_I2V,
"task_id": task_id,
}
body = json.dumps(payload, ensure_ascii=False)
for attempt in range(1, MAX_POLL_ATTEMPTS + 1):
await asyncio.sleep(POLL_INTERVAL)
# 每次轮询需要重新签名(时间戳不同)
headers = _build_signed_headers(access_key, secret_key, action, body)
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
try:
async with httpx.AsyncClient(timeout=POLL_TIMEOUT) as client:
resp = await client.post(url, content=body, headers=headers)
if resp.status_code != 200:
logger.warning(f"轮询即梦视频结果失败 (attempt={attempt}): status={resp.status_code}, body={resp.text[:300]}")
continue
data = resp.json()
except Exception as e:
logger.warning(f"轮询即梦视频异常 (attempt={attempt}): {e}")
continue
code = data.get("code", 0)
task_data = data.get("data", {})
status = task_data.get("status", "")
if status == "done" and code == 10000:
video_url = task_data.get("video_url", "")
if video_url:
return video_url
raise RuntimeError(f"即梦视频生成完成但未找到 video_url: {data}")
elif status == "done" and code != 10000:
msg = data.get("message", "未知错误")
raise RuntimeError(f"即梦视频生成失败 (code={code}): {msg}")
elif status in ("not_found", "expired"):
raise RuntimeError(f"即梦视频任务状态异常: {status}")
else:
# in_queue / generating
if attempt % 6 == 0:
logger.info(f"即梦视频生成中... (attempt={attempt}, status={status})")
raise RuntimeError(f"即梦视频生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成")
async def _download_video_to_local(remote_url: str) -> str:
"""
下载远程视频到本地 uploads/videos/ 目录
Returns:
本地视频的 URL 路径,如 /uploads/videos/xxx.mp4
"""
filename = f"{uuid.uuid4().hex}.mp4"
local_file = VIDEO_UPLOAD_DIR / filename
try:
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(remote_url)
resp.raise_for_status()
local_file.write_bytes(resp.content)
logger.info(f"视频下载完成: {len(resp.content)} 字节 -> {local_file}")
except Exception as e:
logger.error(f"视频下载失败: {e}")
raise RuntimeError(f"视频下载失败: {e}")
# 返回相对 URL 路径(和图片一样通过 /uploads/ 静态服务访问)
return f"/uploads/videos/{filename}"