Files

215 lines
6.8 KiB
Python

"""用户主页和关注系统路由"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import func
from database import get_db
from models.user import User
from models.post import Post
from models.follow import Follow
from models.like import Collect
from models.notification import Notification
from routers.auth import get_current_user
router = APIRouter()
@router.get("/{user_id}")
def get_user_profile(
user_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取用户主页信息"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
post_count = db.query(func.count(Post.id)).filter(Post.user_id == user_id, Post.is_public == True).scalar()
follower_count = db.query(func.count(Follow.id)).filter(Follow.following_id == user_id).scalar()
following_count = db.query(func.count(Follow.id)).filter(Follow.follower_id == user_id).scalar()
is_following = db.query(Follow).filter(
Follow.follower_id == current_user.id, Follow.following_id == user_id
).first() is not None
return {
"id": user.id,
"username": user.username,
"email": user.email,
"avatar": user.avatar,
"created_at": user.created_at,
"post_count": post_count,
"follower_count": follower_count,
"following_count": following_count,
"is_following": is_following,
"is_self": current_user.id == user_id,
}
@router.post("/{user_id}/follow")
def toggle_follow(
user_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""关注/取消关注"""
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="不能关注自己")
target = db.query(User).filter(User.id == user_id).first()
if not target:
raise HTTPException(status_code=404, detail="用户不存在")
existing = db.query(Follow).filter(
Follow.follower_id == current_user.id, Follow.following_id == user_id
).first()
if existing:
db.delete(existing)
db.commit()
return {"followed": False}
else:
follow = Follow(follower_id=current_user.id, following_id=user_id)
db.add(follow)
# 创建通知
notif = Notification(
user_id=user_id,
type="follow",
content=f"{current_user.username} 关注了你",
from_user_id=current_user.id,
related_id=current_user.id,
)
db.add(notif)
db.commit()
return {"followed": True}
@router.get("/{user_id}/posts")
def get_user_posts(
user_id: int,
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取用户的帖子列表"""
query = db.query(Post).filter(Post.user_id == user_id)
if user_id != current_user.id:
query = query.filter(Post.is_public == True)
posts = query.order_by(Post.created_at.desc()).offset((page - 1) * page_size).limit(page_size).all()
user = db.query(User).filter(User.id == user_id).first()
username = user.username if user else "未知"
avatar = user.avatar if user else ""
return [
{
"id": p.id, "title": p.title, "content": p.content[:200],
"category": p.category, "tags": p.tags,
"view_count": p.view_count, "like_count": p.like_count,
"comment_count": p.comment_count, "collect_count": p.collect_count,
"created_at": p.created_at, "updated_at": p.updated_at,
"author": {"id": user_id, "username": username, "avatar": avatar},
}
for p in posts
]
@router.get("/{user_id}/followers")
def get_followers(
user_id: int,
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取粉丝列表"""
follows = (
db.query(Follow)
.filter(Follow.following_id == user_id)
.order_by(Follow.created_at.desc())
.offset((page - 1) * page_size).limit(page_size).all()
)
user_ids = [f.follower_id for f in follows]
users = db.query(User).filter(User.id.in_(user_ids)).all() if user_ids else []
user_map = {u.id: u for u in users}
return [
{
"id": uid,
"username": user_map[uid].username if uid in user_map else "",
"avatar": user_map[uid].avatar if uid in user_map else "",
}
for uid in user_ids
]
@router.get("/{user_id}/following")
def get_following(
user_id: int,
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取关注列表"""
follows = (
db.query(Follow)
.filter(Follow.follower_id == user_id)
.order_by(Follow.created_at.desc())
.offset((page - 1) * page_size).limit(page_size).all()
)
user_ids = [f.following_id for f in follows]
users = db.query(User).filter(User.id.in_(user_ids)).all() if user_ids else []
user_map = {u.id: u for u in users}
return [
{
"id": uid,
"username": user_map[uid].username if uid in user_map else "",
"avatar": user_map[uid].avatar if uid in user_map else "",
}
for uid in user_ids
]
@router.get("/{user_id}/collects")
def get_user_collects(
user_id: int,
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取用户收藏的帖子"""
collects = (
db.query(Collect)
.filter(Collect.user_id == user_id)
.order_by(Collect.created_at.desc())
.offset((page - 1) * page_size).limit(page_size).all()
)
post_ids = [c.post_id for c in collects]
if not post_ids:
return []
posts = db.query(Post).filter(Post.id.in_(post_ids)).all()
post_map = {p.id: p for p in posts}
author_ids = list(set(p.user_id for p in posts))
authors = db.query(User).filter(User.id.in_(author_ids)).all()
author_map = {a.id: a for a in authors}
result = []
for pid in post_ids:
p = post_map.get(pid)
if not p:
continue
a = author_map.get(p.user_id)
result.append({
"id": p.id, "title": p.title, "content": p.content[:200],
"category": p.category, "tags": p.tags,
"view_count": p.view_count, "like_count": p.like_count,
"comment_count": p.comment_count, "collect_count": p.collect_count,
"created_at": p.created_at,
"author": {"id": p.user_id, "username": a.username if a else "", "avatar": a.avatar if a else ""},
})
return result