- 替换视频生成服务为可灵AI多图参考生视频API,支持1-4张多视角图片输入 - 调整图片拼接逻辑,生成横向长图传入即梦API备用 - 实现基于JWT认证的可灵API请求和轮询机制,支持高品质1:1正方形视频生成 - 在设计详情页新增视频展示区域及生成、重新生成和下载视频操作 - 更新后台系统配置,支持配置可灵AI Access Key和Secret Key - 删除即梦视频相关配置及逻辑,所有视频生成功能切换到可灵AI实现 - 优化视频生成提示词,提升视频质感和展示效果 - 增加视频文件本地存储和路径管理,保证视频可访问和下载 - 前端增加视频生成状态管理和用户界面交互提示 - 后端添加PyJWT依赖,支持JWT认证流程
423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""
|
||
AI 视频生成服务
|
||
使用火山引擎 即梦3.0 Pro (Jimeng Video 3.0 Pro) 将设计图生成 360 度旋转展示视频
|
||
|
||
API 文档: https://www.volcengine.com/docs/85621/1777001
|
||
认证方式: Volcengine V4 签名 (Access Key + Secret Key)
|
||
API 端点: https://visual.volcengineapi.com
|
||
"""
|
||
import asyncio
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
import io
|
||
import json
|
||
import logging
|
||
import math
|
||
import os
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Optional, List
|
||
from urllib.parse import quote
|
||
|
||
import httpx
|
||
from PIL import Image
|
||
|
||
from .config_service import get_config_value
|
||
|
||
# 视频本地存储目录
|
||
VIDEO_UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" / "videos"
|
||
VIDEO_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 火山引擎视觉 API 配置
|
||
VISUAL_API_HOST = "visual.volcengineapi.com"
|
||
VISUAL_API_URL = f"https://{VISUAL_API_HOST}"
|
||
REGION = "cn-north-1"
|
||
SERVICE = "cv"
|
||
API_VERSION = "2022-08-31"
|
||
|
||
# 即梦3.0 Pro req_key
|
||
REQ_KEY_I2V = "jimeng_ti2v_v30_pro" # 支持传 image_urls 做图生视频
|
||
|
||
# 超时与轮询配置
|
||
SUBMIT_TIMEOUT = 30
|
||
POLL_TIMEOUT = 15
|
||
MAX_POLL_ATTEMPTS = 120 # 约 10 分钟
|
||
POLL_INTERVAL = 5
|
||
|
||
|
||
# ============================================================
|
||
# Volcengine V4 签名实现
|
||
# ============================================================
|
||
|
||
def _sign(key: bytes, msg: str) -> bytes:
|
||
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
||
|
||
|
||
def _get_signature_key(secret_key: str, date_stamp: str, region: str, service: str) -> bytes:
|
||
k_date = _sign(secret_key.encode("utf-8"), date_stamp)
|
||
k_region = _sign(k_date, region)
|
||
k_service = _sign(k_region, service)
|
||
k_signing = _sign(k_service, "request")
|
||
return k_signing
|
||
|
||
|
||
def _build_signed_headers(
|
||
access_key: str,
|
||
secret_key: str,
|
||
action: str,
|
||
body: str,
|
||
) -> dict:
|
||
"""
|
||
构建带 V4 签名的请求头
|
||
|
||
参考: https://www.volcengine.com/docs/6369/67269
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
date_stamp = now.strftime("%Y%m%d")
|
||
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
|
||
|
||
# 请求参数
|
||
canonical_querystring = f"Action={quote(action, safe='')}&Version={quote(API_VERSION, safe='')}"
|
||
|
||
# 规范请求头
|
||
content_type = "application/json"
|
||
canonical_headers = (
|
||
f"content-type:{content_type}\n"
|
||
f"host:{VISUAL_API_HOST}\n"
|
||
f"x-date:{amz_date}\n"
|
||
)
|
||
signed_headers = "content-type;host;x-date"
|
||
|
||
# Payload hash
|
||
payload_hash = hashlib.sha256(body.encode("utf-8")).hexdigest()
|
||
|
||
# 规范请求
|
||
canonical_request = (
|
||
f"POST\n"
|
||
f"/\n"
|
||
f"{canonical_querystring}\n"
|
||
f"{canonical_headers}\n"
|
||
f"{signed_headers}\n"
|
||
f"{payload_hash}"
|
||
)
|
||
|
||
# 待签字符串
|
||
credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request"
|
||
string_to_sign = (
|
||
f"HMAC-SHA256\n"
|
||
f"{amz_date}\n"
|
||
f"{credential_scope}\n"
|
||
f"{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"
|
||
)
|
||
|
||
# 签名
|
||
signing_key = _get_signature_key(secret_key, date_stamp, REGION, SERVICE)
|
||
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
|
||
|
||
# Authorization 头
|
||
authorization = (
|
||
f"HMAC-SHA256 "
|
||
f"Credential={access_key}/{credential_scope}, "
|
||
f"SignedHeaders={signed_headers}, "
|
||
f"Signature={signature}"
|
||
)
|
||
|
||
return {
|
||
"Content-Type": content_type,
|
||
"Host": VISUAL_API_HOST,
|
||
"X-Date": amz_date,
|
||
"Authorization": authorization,
|
||
}
|
||
|
||
|
||
# ============================================================
|
||
# 视频生成核心逻辑
|
||
# ============================================================
|
||
|
||
def _get_volc_keys() -> tuple:
|
||
"""获取火山引擎 Access Key 和 Secret Key"""
|
||
access_key = get_config_value("VOLC_ACCESS_KEY", "")
|
||
secret_key = get_config_value("VOLC_SECRET_KEY", "")
|
||
if not access_key or not secret_key:
|
||
raise RuntimeError(
|
||
"未配置 VOLC_ACCESS_KEY 或 VOLC_SECRET_KEY,无法使用即梦视频生成。"
|
||
"请在管理后台 系统配置 中添加火山引擎 Access Key 和 Secret Key。"
|
||
)
|
||
return access_key, secret_key
|
||
|
||
|
||
async def _merge_images_to_base64(image_urls: List[str]) -> str:
|
||
"""
|
||
下载多张图片并横向拼接为一张长图,返回 base64 编码
|
||
|
||
拼接策略:
|
||
- 1张: 直接使用
|
||
- 多张: 横向一字排开拼接(展示各角度全貌)
|
||
"""
|
||
# 下载所有图片
|
||
images = []
|
||
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
|
||
for url in image_urls:
|
||
try:
|
||
resp = await client.get(url)
|
||
resp.raise_for_status()
|
||
img = Image.open(io.BytesIO(resp.content)).convert("RGB")
|
||
images.append(img)
|
||
logger.info(f"下载图片成功: {url[:60]}... 尺寸={img.size}")
|
||
except Exception as e:
|
||
logger.warning(f"下载图片失败: {url[:60]}... {e}")
|
||
|
||
if not images:
|
||
raise RuntimeError("没有成功下载任何图片")
|
||
|
||
# 只有1张时直接使用
|
||
if len(images) == 1:
|
||
merged = images[0]
|
||
else:
|
||
# 统一高度,横向拼接
|
||
target_h = max(img.height for img in images)
|
||
resized = []
|
||
for img in images:
|
||
if img.height != target_h:
|
||
ratio = target_h / img.height
|
||
new_w = int(img.width * ratio)
|
||
img = img.resize((new_w, target_h), Image.LANCZOS)
|
||
resized.append(img)
|
||
|
||
total_w = sum(img.width for img in resized)
|
||
merged = Image.new("RGB", (total_w, target_h), (255, 255, 255))
|
||
x_offset = 0
|
||
for img in resized:
|
||
merged.paste(img, (x_offset, 0))
|
||
x_offset += img.width
|
||
|
||
logger.info(f"图片横向拼接完成: {len(resized)}张 -> 尺寸={merged.size}")
|
||
|
||
# 压缩图片尺寸(即梦API对请求体大小有限制)
|
||
max_width = 1920
|
||
if merged.width > max_width:
|
||
ratio = max_width / merged.width
|
||
new_h = int(merged.height * ratio)
|
||
merged = merged.resize((max_width, new_h), Image.LANCZOS)
|
||
logger.info(f"拼接图已压缩至: {merged.size}")
|
||
|
||
# 转 base64,控制质量确保 base64 不会过大
|
||
buf = io.BytesIO()
|
||
quality = 75
|
||
merged.save(buf, format="JPEG", quality=quality)
|
||
# 如果超过 5MB,进一步压缩
|
||
while buf.tell() > 5 * 1024 * 1024 and quality > 30:
|
||
buf = io.BytesIO()
|
||
quality -= 10
|
||
merged.save(buf, format="JPEG", quality=quality)
|
||
logger.info(f"图片超过 5MB,降低质量到 {quality},大小={buf.tell()} bytes")
|
||
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
|
||
logger.info(f"拼接图 base64 长度: {len(b64)}")
|
||
return b64
|
||
|
||
|
||
async def generate_video(
|
||
image_urls: List[str],
|
||
prompt: str = "",
|
||
duration_seconds: int = 5,
|
||
) -> str:
|
||
"""
|
||
调用即梦3.0 Pro 生成 360 度旋转展示视频
|
||
|
||
流程:
|
||
1. 将多视角图片横向拼接成一张长图
|
||
2. 以 base64 方式传入即梦API
|
||
3. 使用强化提示词描述单品展示
|
||
|
||
Args:
|
||
image_urls: 多视角图片 URL 列表
|
||
prompt: 视频生成提示词
|
||
duration_seconds: 预留参数
|
||
|
||
Returns:
|
||
生成的视频本地 URL
|
||
"""
|
||
access_key, secret_key = _get_volc_keys()
|
||
|
||
logger.info(f"传入视频生成的图片数量: {len(image_urls)}")
|
||
|
||
# Step 0: 拼接多视角图片为横向长图,转 base64
|
||
merged_b64 = await _merge_images_to_base64(image_urls)
|
||
logger.info(f"多视角图片已拼接为长图,base64长度: {len(merged_b64)}")
|
||
|
||
# 从配置读取默认 prompt,如果没有则使用强化提示词
|
||
if not prompt:
|
||
prompt = get_config_value("VIDEO_PROMPT", "")
|
||
if not prompt:
|
||
prompt = (
|
||
"参考图展示的是同一件精美玉雕工艺品的多个角度,"
|
||
"请生成这一件玉雕作品在专业珠宝摄影棚内的展示视频。"
|
||
"纯白色背景,柔和的珠宝摄影灯光,"
|
||
"这一件玉石作品放在旋转展台上缓慢平稳地旋转360度,"
|
||
"展现其温润的质感、细腻的雕刻纹理和通透的光泽,"
|
||
"电影级画质,微距的细节感,平稳流畅的转台旋转,"
|
||
"画面中只有一件玉雕作品"
|
||
)
|
||
|
||
# Step 1: 提交任务(用 base64 拼接长图,失败则降级为第一张图片 URL)
|
||
try:
|
||
task_id = await _submit_video_task(access_key, secret_key, None, prompt, merged_b64)
|
||
logger.info(f"即梦视频生成任务已提交(base64拼接图): task_id={task_id}")
|
||
except Exception as e:
|
||
logger.warning(f"base64 拼接图提交失败,降级为第一张图片URL: {e}")
|
||
first_url = image_urls[0]
|
||
task_id = await _submit_video_task(access_key, secret_key, first_url, prompt)
|
||
logger.info(f"即梦视频生成任务已提交(单图URL): task_id={task_id}")
|
||
|
||
# Step 2: 轮询等待结果
|
||
remote_video_url = await _poll_video_result(access_key, secret_key, task_id)
|
||
logger.info(f"即梦视频生成完成: {remote_video_url[:80]}...")
|
||
|
||
# Step 3: 下载视频到本地存储
|
||
local_path = await _download_video_to_local(remote_video_url)
|
||
logger.info(f"视频已保存到本地: {local_path}")
|
||
|
||
return local_path
|
||
|
||
|
||
async def _submit_video_task(
|
||
access_key: str,
|
||
secret_key: str,
|
||
image_url: Optional[str],
|
||
prompt: str,
|
||
image_base64: Optional[str] = None,
|
||
) -> str:
|
||
"""提交图生视频任务到即梦3.0 Pro,支持 URL 或 base64 输入"""
|
||
action = "CVSync2AsyncSubmitTask"
|
||
|
||
payload = {
|
||
"req_key": REQ_KEY_I2V,
|
||
"prompt": prompt,
|
||
"seed": -1,
|
||
"frames": int(get_config_value("VIDEO_FRAMES", "121")),
|
||
"aspect_ratio": "1:1",
|
||
}
|
||
|
||
# 优先使用 base64(拼接长图),否则用 URL
|
||
if image_base64:
|
||
payload["binary_data_base64"] = [image_base64]
|
||
logger.info(f"使用 base64 拼接长图提交即梦视频任务,base64长度={len(image_base64)}")
|
||
elif image_url:
|
||
payload["image_urls"] = [image_url]
|
||
logger.info(f"使用图片URL提交即梦视频任务: {image_url[:80]}...")
|
||
else:
|
||
raise RuntimeError("未提供图片输入")
|
||
|
||
body = json.dumps(payload, ensure_ascii=False)
|
||
headers = _build_signed_headers(access_key, secret_key, action, body)
|
||
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
|
||
|
||
# base64 数据量大,需要更长的超时时间
|
||
timeout = 120 if image_base64 else SUBMIT_TIMEOUT
|
||
|
||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||
resp = await client.post(url, content=body, headers=headers)
|
||
if resp.status_code != 200:
|
||
error_body = resp.text[:1000]
|
||
logger.error(f"即梦视频任务提交失败: status={resp.status_code}, body={error_body}")
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
# 检查响应
|
||
code = data.get("code", 0)
|
||
if code != 10000:
|
||
msg = data.get("message", "未知错误")
|
||
raise RuntimeError(f"即梦视频任务提交失败 (code={code}): {msg}")
|
||
|
||
task_id = data.get("data", {}).get("task_id")
|
||
if not task_id:
|
||
raise RuntimeError(f"即梦响应中未找到 task_id: {data}")
|
||
|
||
return task_id
|
||
|
||
|
||
async def _poll_video_result(
|
||
access_key: str,
|
||
secret_key: str,
|
||
task_id: str,
|
||
) -> str:
|
||
"""轮询视频生成结果"""
|
||
action = "CVSync2AsyncGetResult"
|
||
|
||
payload = {
|
||
"req_key": REQ_KEY_I2V,
|
||
"task_id": task_id,
|
||
}
|
||
|
||
body = json.dumps(payload, ensure_ascii=False)
|
||
|
||
for attempt in range(1, MAX_POLL_ATTEMPTS + 1):
|
||
await asyncio.sleep(POLL_INTERVAL)
|
||
|
||
# 每次轮询需要重新签名(时间戳不同)
|
||
headers = _build_signed_headers(access_key, secret_key, action, body)
|
||
url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}"
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=POLL_TIMEOUT) as client:
|
||
resp = await client.post(url, content=body, headers=headers)
|
||
if resp.status_code != 200:
|
||
logger.warning(f"轮询即梦视频结果失败 (attempt={attempt}): status={resp.status_code}, body={resp.text[:300]}")
|
||
continue
|
||
data = resp.json()
|
||
except Exception as e:
|
||
logger.warning(f"轮询即梦视频异常 (attempt={attempt}): {e}")
|
||
continue
|
||
|
||
code = data.get("code", 0)
|
||
task_data = data.get("data", {})
|
||
status = task_data.get("status", "")
|
||
|
||
if status == "done" and code == 10000:
|
||
video_url = task_data.get("video_url", "")
|
||
if video_url:
|
||
return video_url
|
||
raise RuntimeError(f"即梦视频生成完成但未找到 video_url: {data}")
|
||
|
||
elif status == "done" and code != 10000:
|
||
msg = data.get("message", "未知错误")
|
||
raise RuntimeError(f"即梦视频生成失败 (code={code}): {msg}")
|
||
|
||
elif status in ("not_found", "expired"):
|
||
raise RuntimeError(f"即梦视频任务状态异常: {status}")
|
||
|
||
else:
|
||
# in_queue / generating
|
||
if attempt % 6 == 0:
|
||
logger.info(f"即梦视频生成中... (attempt={attempt}, status={status})")
|
||
|
||
raise RuntimeError(f"即梦视频生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成")
|
||
|
||
|
||
async def _download_video_to_local(remote_url: str) -> str:
|
||
"""
|
||
下载远程视频到本地 uploads/videos/ 目录
|
||
|
||
Returns:
|
||
本地视频的 URL 路径,如 /uploads/videos/xxx.mp4
|
||
"""
|
||
filename = f"{uuid.uuid4().hex}.mp4"
|
||
local_file = VIDEO_UPLOAD_DIR / filename
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
|
||
resp = await client.get(remote_url)
|
||
resp.raise_for_status()
|
||
local_file.write_bytes(resp.content)
|
||
logger.info(f"视频下载完成: {len(resp.content)} 字节 -> {local_file}")
|
||
except Exception as e:
|
||
logger.error(f"视频下载失败: {e}")
|
||
raise RuntimeError(f"视频下载失败: {e}")
|
||
|
||
# 返回相对 URL 路径(和图片一样通过 /uploads/ 静态服务访问)
|
||
return f"/uploads/videos/{filename}"
|