Files
bianchengshequ/backend/routers/notifications.py

90 lines
2.5 KiB
Python

"""消息通知路由"""
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import func
from database import get_db
from models.user import User
from models.notification import Notification
from routers.auth import get_current_user
router = APIRouter()
@router.get("")
def get_notifications(
page: int = 1,
page_size: int = 30,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取通知列表"""
notifs = (
db.query(Notification)
.filter(Notification.user_id == current_user.id)
.order_by(Notification.created_at.desc())
.offset((page - 1) * page_size).limit(page_size).all()
)
# 获取触发用户信息
from_ids = list(set(n.from_user_id for n in notifs if n.from_user_id))
users = db.query(User).filter(User.id.in_(from_ids)).all() if from_ids else []
user_map = {u.id: u for u in users}
return [
{
"id": n.id,
"type": n.type,
"content": n.content,
"related_id": n.related_id,
"is_read": n.is_read,
"created_at": n.created_at,
"from_user": {
"id": n.from_user_id,
"username": user_map[n.from_user_id].username,
"avatar": user_map[n.from_user_id].avatar,
} if n.from_user_id and n.from_user_id in user_map else None,
}
for n in notifs
]
@router.get("/unread-count")
def get_unread_count(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""获取未读通知数量"""
count = (
db.query(func.count(Notification.id))
.filter(Notification.user_id == current_user.id, Notification.is_read == False)
.scalar()
)
return {"count": count}
@router.put("/read-all")
def read_all(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""全部标为已读"""
db.query(Notification).filter(
Notification.user_id == current_user.id, Notification.is_read == False
).update({"is_read": True})
db.commit()
return {"message": "已全部标为已读"}
@router.put("/{notif_id}/read")
def read_one(
notif_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""单条标为已读"""
db.query(Notification).filter(
Notification.id == notif_id, Notification.user_id == current_user.id
).update({"is_read": True})
db.commit()
return {"message": "已标为已读"}