""" AI 视频生成服务 使用火山引擎 即梦3.0 Pro (Jimeng Video 3.0 Pro) 将设计图生成 360 度旋转展示视频 API 文档: https://www.volcengine.com/docs/85621/1777001 认证方式: Volcengine V4 签名 (Access Key + Secret Key) API 端点: https://visual.volcengineapi.com """ import asyncio import base64 import hashlib import hmac import io import json import logging import math import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional, List from urllib.parse import quote import httpx from PIL import Image from .config_service import get_config_value # 视频本地存储目录 VIDEO_UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" / "videos" VIDEO_UPLOAD_DIR.mkdir(parents=True, exist_ok=True) logger = logging.getLogger(__name__) # 火山引擎视觉 API 配置 VISUAL_API_HOST = "visual.volcengineapi.com" VISUAL_API_URL = f"https://{VISUAL_API_HOST}" REGION = "cn-north-1" SERVICE = "cv" API_VERSION = "2022-08-31" # 即梦3.0 Pro req_key REQ_KEY_I2V = "jimeng_ti2v_v30_pro" # 支持传 image_urls 做图生视频 # 超时与轮询配置 SUBMIT_TIMEOUT = 30 POLL_TIMEOUT = 15 MAX_POLL_ATTEMPTS = 120 # 约 10 分钟 POLL_INTERVAL = 5 # ============================================================ # Volcengine V4 签名实现 # ============================================================ def _sign(key: bytes, msg: str) -> bytes: return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() def _get_signature_key(secret_key: str, date_stamp: str, region: str, service: str) -> bytes: k_date = _sign(secret_key.encode("utf-8"), date_stamp) k_region = _sign(k_date, region) k_service = _sign(k_region, service) k_signing = _sign(k_service, "request") return k_signing def _build_signed_headers( access_key: str, secret_key: str, action: str, body: str, ) -> dict: """ 构建带 V4 签名的请求头 参考: https://www.volcengine.com/docs/6369/67269 """ now = datetime.now(timezone.utc) date_stamp = now.strftime("%Y%m%d") amz_date = now.strftime("%Y%m%dT%H%M%SZ") # 请求参数 canonical_querystring = f"Action={quote(action, safe='')}&Version={quote(API_VERSION, safe='')}" # 规范请求头 content_type = "application/json" canonical_headers = ( f"content-type:{content_type}\n" f"host:{VISUAL_API_HOST}\n" f"x-date:{amz_date}\n" ) signed_headers = "content-type;host;x-date" # Payload hash payload_hash = hashlib.sha256(body.encode("utf-8")).hexdigest() # 规范请求 canonical_request = ( f"POST\n" f"/\n" f"{canonical_querystring}\n" f"{canonical_headers}\n" f"{signed_headers}\n" f"{payload_hash}" ) # 待签字符串 credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request" string_to_sign = ( f"HMAC-SHA256\n" f"{amz_date}\n" f"{credential_scope}\n" f"{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}" ) # 签名 signing_key = _get_signature_key(secret_key, date_stamp, REGION, SERVICE) signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() # Authorization 头 authorization = ( f"HMAC-SHA256 " f"Credential={access_key}/{credential_scope}, " f"SignedHeaders={signed_headers}, " f"Signature={signature}" ) return { "Content-Type": content_type, "Host": VISUAL_API_HOST, "X-Date": amz_date, "Authorization": authorization, } # ============================================================ # 视频生成核心逻辑 # ============================================================ def _get_volc_keys() -> tuple: """获取火山引擎 Access Key 和 Secret Key""" access_key = get_config_value("VOLC_ACCESS_KEY", "") secret_key = get_config_value("VOLC_SECRET_KEY", "") if not access_key or not secret_key: raise RuntimeError( "未配置 VOLC_ACCESS_KEY 或 VOLC_SECRET_KEY,无法使用即梦视频生成。" "请在管理后台 系统配置 中添加火山引擎 Access Key 和 Secret Key。" ) return access_key, secret_key async def _merge_images_to_base64(image_urls: List[str]) -> str: """ 下载多张图片并横向拼接为一张长图,返回 base64 编码 拼接策略: - 1张: 直接使用 - 多张: 横向一字排开拼接(展示各角度全貌) """ # 下载所有图片 images = [] async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: for url in image_urls: try: resp = await client.get(url) resp.raise_for_status() img = Image.open(io.BytesIO(resp.content)).convert("RGB") images.append(img) logger.info(f"下载图片成功: {url[:60]}... 尺寸={img.size}") except Exception as e: logger.warning(f"下载图片失败: {url[:60]}... {e}") if not images: raise RuntimeError("没有成功下载任何图片") # 只有1张时直接使用 if len(images) == 1: merged = images[0] else: # 统一高度,横向拼接 target_h = max(img.height for img in images) resized = [] for img in images: if img.height != target_h: ratio = target_h / img.height new_w = int(img.width * ratio) img = img.resize((new_w, target_h), Image.LANCZOS) resized.append(img) total_w = sum(img.width for img in resized) merged = Image.new("RGB", (total_w, target_h), (255, 255, 255)) x_offset = 0 for img in resized: merged.paste(img, (x_offset, 0)) x_offset += img.width logger.info(f"图片横向拼接完成: {len(resized)}张 -> 尺寸={merged.size}") # 压缩图片尺寸(即梦API对请求体大小有限制) max_width = 1920 if merged.width > max_width: ratio = max_width / merged.width new_h = int(merged.height * ratio) merged = merged.resize((max_width, new_h), Image.LANCZOS) logger.info(f"拼接图已压缩至: {merged.size}") # 转 base64,控制质量确保 base64 不会过大 buf = io.BytesIO() quality = 75 merged.save(buf, format="JPEG", quality=quality) # 如果超过 5MB,进一步压缩 while buf.tell() > 5 * 1024 * 1024 and quality > 30: buf = io.BytesIO() quality -= 10 merged.save(buf, format="JPEG", quality=quality) logger.info(f"图片超过 5MB,降低质量到 {quality},大小={buf.tell()} bytes") b64 = base64.b64encode(buf.getvalue()).decode("utf-8") logger.info(f"拼接图 base64 长度: {len(b64)}") return b64 async def generate_video( image_urls: List[str], prompt: str = "", duration_seconds: int = 5, ) -> str: """ 调用即梦3.0 Pro 生成 360 度旋转展示视频 流程: 1. 将多视角图片横向拼接成一张长图 2. 以 base64 方式传入即梦API 3. 使用强化提示词描述单品展示 Args: image_urls: 多视角图片 URL 列表 prompt: 视频生成提示词 duration_seconds: 预留参数 Returns: 生成的视频本地 URL """ access_key, secret_key = _get_volc_keys() logger.info(f"传入视频生成的图片数量: {len(image_urls)}") # Step 0: 拼接多视角图片为横向长图,转 base64 merged_b64 = await _merge_images_to_base64(image_urls) logger.info(f"多视角图片已拼接为长图,base64长度: {len(merged_b64)}") # 从配置读取默认 prompt,如果没有则使用强化提示词 if not prompt: prompt = get_config_value("VIDEO_PROMPT", "") if not prompt: prompt = ( "参考图展示的是同一件精美玉雕工艺品的多个角度," "请生成这一件玉雕作品在专业珠宝摄影棚内的展示视频。" "纯白色背景,柔和的珠宝摄影灯光," "这一件玉石作品放在旋转展台上缓慢平稳地旋转360度," "展现其温润的质感、细腻的雕刻纹理和通透的光泽," "电影级画质,微距的细节感,平稳流畅的转台旋转," "画面中只有一件玉雕作品" ) # Step 1: 提交任务(用 base64 拼接长图,失败则降级为第一张图片 URL) try: task_id = await _submit_video_task(access_key, secret_key, None, prompt, merged_b64) logger.info(f"即梦视频生成任务已提交(base64拼接图): task_id={task_id}") except Exception as e: logger.warning(f"base64 拼接图提交失败,降级为第一张图片URL: {e}") first_url = image_urls[0] task_id = await _submit_video_task(access_key, secret_key, first_url, prompt) logger.info(f"即梦视频生成任务已提交(单图URL): task_id={task_id}") # Step 2: 轮询等待结果 remote_video_url = await _poll_video_result(access_key, secret_key, task_id) logger.info(f"即梦视频生成完成: {remote_video_url[:80]}...") # Step 3: 下载视频到本地存储 local_path = await _download_video_to_local(remote_video_url) logger.info(f"视频已保存到本地: {local_path}") return local_path async def _submit_video_task( access_key: str, secret_key: str, image_url: Optional[str], prompt: str, image_base64: Optional[str] = None, ) -> str: """提交图生视频任务到即梦3.0 Pro,支持 URL 或 base64 输入""" action = "CVSync2AsyncSubmitTask" payload = { "req_key": REQ_KEY_I2V, "prompt": prompt, "seed": -1, "frames": int(get_config_value("VIDEO_FRAMES", "121")), "aspect_ratio": "1:1", } # 优先使用 base64(拼接长图),否则用 URL if image_base64: payload["binary_data_base64"] = [image_base64] logger.info(f"使用 base64 拼接长图提交即梦视频任务,base64长度={len(image_base64)}") elif image_url: payload["image_urls"] = [image_url] logger.info(f"使用图片URL提交即梦视频任务: {image_url[:80]}...") else: raise RuntimeError("未提供图片输入") body = json.dumps(payload, ensure_ascii=False) headers = _build_signed_headers(access_key, secret_key, action, body) url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}" # base64 数据量大,需要更长的超时时间 timeout = 120 if image_base64 else SUBMIT_TIMEOUT async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(url, content=body, headers=headers) if resp.status_code != 200: error_body = resp.text[:1000] logger.error(f"即梦视频任务提交失败: status={resp.status_code}, body={error_body}") resp.raise_for_status() data = resp.json() # 检查响应 code = data.get("code", 0) if code != 10000: msg = data.get("message", "未知错误") raise RuntimeError(f"即梦视频任务提交失败 (code={code}): {msg}") task_id = data.get("data", {}).get("task_id") if not task_id: raise RuntimeError(f"即梦响应中未找到 task_id: {data}") return task_id async def _poll_video_result( access_key: str, secret_key: str, task_id: str, ) -> str: """轮询视频生成结果""" action = "CVSync2AsyncGetResult" payload = { "req_key": REQ_KEY_I2V, "task_id": task_id, } body = json.dumps(payload, ensure_ascii=False) for attempt in range(1, MAX_POLL_ATTEMPTS + 1): await asyncio.sleep(POLL_INTERVAL) # 每次轮询需要重新签名(时间戳不同) headers = _build_signed_headers(access_key, secret_key, action, body) url = f"{VISUAL_API_URL}?Action={action}&Version={API_VERSION}" try: async with httpx.AsyncClient(timeout=POLL_TIMEOUT) as client: resp = await client.post(url, content=body, headers=headers) if resp.status_code != 200: logger.warning(f"轮询即梦视频结果失败 (attempt={attempt}): status={resp.status_code}, body={resp.text[:300]}") continue data = resp.json() except Exception as e: logger.warning(f"轮询即梦视频异常 (attempt={attempt}): {e}") continue code = data.get("code", 0) task_data = data.get("data", {}) status = task_data.get("status", "") if status == "done" and code == 10000: video_url = task_data.get("video_url", "") if video_url: return video_url raise RuntimeError(f"即梦视频生成完成但未找到 video_url: {data}") elif status == "done" and code != 10000: msg = data.get("message", "未知错误") raise RuntimeError(f"即梦视频生成失败 (code={code}): {msg}") elif status in ("not_found", "expired"): raise RuntimeError(f"即梦视频任务状态异常: {status}") else: # in_queue / generating if attempt % 6 == 0: logger.info(f"即梦视频生成中... (attempt={attempt}, status={status})") raise RuntimeError(f"即梦视频生成超时: 轮询 {MAX_POLL_ATTEMPTS} 次后仍未完成") async def _download_video_to_local(remote_url: str) -> str: """ 下载远程视频到本地 uploads/videos/ 目录 Returns: 本地视频的 URL 路径,如 /uploads/videos/xxx.mp4 """ filename = f"{uuid.uuid4().hex}.mp4" local_file = VIDEO_UPLOAD_DIR / filename try: async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client: resp = await client.get(remote_url) resp.raise_for_status() local_file.write_bytes(resp.content) logger.info(f"视频下载完成: {len(resp.content)} 字节 -> {local_file}") except Exception as e: logger.error(f"视频下载失败: {e}") raise RuntimeError(f"视频下载失败: {e}") # 返回相对 URL 路径(和图片一样通过 /uploads/ 静态服务访问) return f"/uploads/videos/{filename}"