"""通用文件上传路由 - 腾讯云COS""" import uuid import os from datetime import datetime from fastapi import APIRouter, UploadFile, File, Depends, HTTPException from sqlalchemy.orm import Session from routers.auth import get_current_user from models.user import User from models.system_config import SystemConfig from database import get_db from config import MAX_UPLOAD_SIZE, MAX_ATTACHMENT_SIZE, UPLOAD_DIR from models.attachment import Attachment router = APIRouter() ALLOWED_IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif", "image/webp"] ALLOWED_ATTACHMENT_TYPES = [ "application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-powerpoint", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/zip", "application/x-zip-compressed", "application/x-rar-compressed", "application/vnd.rar", ] ALLOWED_ATTACHMENT_EXTS = { "pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "zip", "rar", } def _get_cos_config(db: Session) -> dict: """从数据库读取COS配置""" keys = ["cos_secret_id", "cos_secret_key", "cos_bucket", "cos_region", "cos_custom_domain"] config = {} for k in keys: row = db.query(SystemConfig).filter(SystemConfig.key == k).first() config[k] = row.value if row else "" return config def _get_cos_client(db: Session): """获取COS客户端实例,未配置则返回None""" config = _get_cos_config(db) secret_id = config.get("cos_secret_id", "") secret_key = config.get("cos_secret_key", "") bucket = config.get("cos_bucket", "") region = config.get("cos_region", "") if not all([secret_id, secret_key, bucket, region]): return None, config try: from qcloud_cos import CosConfig, CosS3Client cos_config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key) client = CosS3Client(cos_config) return client, config except ImportError: return None, config def _build_cos_url(config: dict, object_key: str) -> str: """构建COS访问URL""" custom_domain = config.get("cos_custom_domain", "") if custom_domain: return f"https://{custom_domain}/{object_key}" bucket = config.get("cos_bucket", "") region = config.get("cos_region", "") return f"https://{bucket}.cos.{region}.myqcloud.com/{object_key}" @router.post("/image") async def upload_image( file: UploadFile = File(...), current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """上传图片,优先OSS,未配置则本地存储""" # 验证文件类型 if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=400, detail="不支持的文件类型,仅支持 JPG/PNG/GIF/WEBP") # 读取文件内容 content = await file.read() # 验证文件大小 if len(content) > MAX_UPLOAD_SIZE: raise HTTPException(status_code=400, detail=f"文件过大,最大支持 {MAX_UPLOAD_SIZE // (1024*1024)}MB") # 生成唯一文件名 ext = file.filename.split(".")[-1].lower() if "." in file.filename else "png" date_prefix = datetime.now().strftime("%Y/%m") filename = f"{uuid.uuid4().hex}.{ext}" object_key = f"images/{date_prefix}/{filename}" # 尝试COS上传(从数据库读取配置) client, config = _get_cos_client(db) if client: try: client.put_object( Bucket=config.get("cos_bucket", ""), Body=content, Key=object_key, ContentType=file.content_type, ) url = _build_cos_url(config, object_key) return {"url": url, "storage": "cos"} except Exception as e: raise HTTPException(status_code=500, detail=f"COS上传失败: {str(e)}") else: # 降级到本地存储 os.makedirs(UPLOAD_DIR, exist_ok=True) filepath = os.path.join(UPLOAD_DIR, filename) with open(filepath, "wb") as f: f.write(content) return {"url": f"/uploads/{filename}", "storage": "local"} @router.post("/attachment") async def upload_attachment( file: UploadFile = File(...), post_id: int = 0, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """上传附件到COS,支持 PDF/Word/Excel/PPT/ZIP/RAR""" # 验证文件扩展名 ext = file.filename.split(".")[-1].lower() if "." in file.filename else "" if ext not in ALLOWED_ATTACHMENT_EXTS: raise HTTPException( status_code=400, detail=f"不支持的文件类型,仅支持: {', '.join(sorted(ALLOWED_ATTACHMENT_EXTS))}", ) # 读取文件内容 content = await file.read() if len(content) > MAX_ATTACHMENT_SIZE: raise HTTPException( status_code=400, detail=f"文件过大,最大支持 {MAX_ATTACHMENT_SIZE // (1024*1024)}MB", ) # 生成唯一文件名 date_prefix = datetime.now().strftime("%Y/%m") unique_name = f"{uuid.uuid4().hex}.{ext}" object_key = f"attachments/{date_prefix}/{unique_name}" # 上传到COS client, config = _get_cos_client(db) if not client: raise HTTPException(status_code=500, detail="对象存储未配置,无法上传附件") try: client.put_object( Bucket=config.get("cos_bucket", ""), Body=content, Key=object_key, ContentType=file.content_type or "application/octet-stream", ) except Exception as e: raise HTTPException(status_code=500, detail=f"COS上传失败: {str(e)}") url = _build_cos_url(config, object_key) # 写入数据库 attachment = Attachment( post_id=post_id if post_id else None, user_id=current_user.id, filename=file.filename, storage_key=object_key, url=url, file_size=len(content), file_type=file.content_type or "application/octet-stream", ) db.add(attachment) db.commit() db.refresh(attachment) return { "id": attachment.id, "filename": attachment.filename, "url": attachment.url, "file_size": attachment.file_size, "file_type": attachment.file_type, } @router.put("/attachment/{attachment_id}/post") async def update_attachment_post( attachment_id: int, post_id: int = 0, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """将附件关联到帖子(新建帖子后回填 post_id)""" attachment = db.query(Attachment).filter( Attachment.id == attachment_id, Attachment.user_id == current_user.id, ).first() if not attachment: raise HTTPException(status_code=404, detail="附件不存在") attachment.post_id = post_id db.commit() return {"message": "ok"}