""" 认证依赖注入 提供用户认证相关的 FastAPI 依赖 """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.orm import Session from ..config import settings from ..database import get_db from ..models.user import User # OAuth2 密码认证方案,tokenUrl 指向登录接口 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: """ 获取当前登录用户 从 JWT token 中解析用户 ID,查询数据库返回用户对象 Args: token: JWT access token db: 数据库会话 Returns: 当前登录的用户对象 Raises: HTTPException: token 无效或用户不存在时抛出 401 """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无法验证凭据", headers={"WWW-Authenticate": "Bearer"}, ) try: # 解码 JWT token payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception # 从数据库查询用户 user = db.query(User).filter(User.id == int(user_id)).first() if user is None: raise credentials_exception return user def get_admin_user( current_user: User = Depends(get_current_user) ) -> User: """ 获取当前管理员用户 验证当前用户是否为管理员,非管理员抛出 403 """ if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="权限不足,需要管理员权限" ) return current_user