from fastapi import Depends, HTTPException, status, Request from sqlalchemy.orm import Session from jose import JWTError from app.core.database import get_db from app.core.security import decode_token from app.models.user import User, Role from app.services import user_service def get_token_from_request(request: Request) -> str: auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): return auth[7:] # Fallback to query param for some special cases token = request.query_params.get("token") if token: return token raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少认证令牌", headers={"WWW-Authenticate": "Bearer"}, ) def get_current_user( request: Request, db: Session = Depends(get_db), ) -> User: token = get_token_from_request(request) payload = decode_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证令牌", headers={"WWW-Authenticate": "Bearer"}, ) user = user_service.get_user_by_id(db, int(payload.get("sub"))) if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在") if not user.is_active: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="用户已被禁用") return user def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: return current_user # ===================== RBAC Dependencies ===================== # Role code constants ROLE_SUPERADMIN = "superadmin" ROLE_ADMIN = "admin" ROLE_PROJECT_MANAGER = "project_manager" ROLE_LABELER = "labeler" ROLE_REVIEWER = "reviewer" ROLE_GUEST = "guest" # Role hierarchy (higher index = more permissions) ROLE_LEVELS = { ROLE_GUEST: 0, ROLE_LABELER: 1, ROLE_REVIEWER: 1, ROLE_PROJECT_MANAGER: 2, ROLE_ADMIN: 3, ROLE_SUPERADMIN: 4, } def _get_user_role_codes(user: User) -> list[str]: """Get list of role codes for a user.""" return [r.code for r in user.roles] def _has_role(user: User, role_code: str) -> bool: """Check if user has a specific role.""" return role_code in _get_user_role_codes(user) def _has_any_role(user: User, role_codes: list[str]) -> bool: """Check if user has any of the specified roles.""" user_roles = _get_user_role_codes(user) return any(r in user_roles for r in role_codes) def _is_admin(user: User) -> bool: """Check if user is admin or superadmin.""" return user.is_superuser or _has_any_role(user, [ROLE_SUPERADMIN, ROLE_ADMIN]) def require_admin(current_user: User = Depends(get_current_user)) -> User: """Require admin or superadmin role.""" if not _is_admin(current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限", ) return current_user def require_manager(current_user: User = Depends(get_current_user)) -> User: """Require project manager or above.""" if _is_admin(current_user): return current_user if _has_role(current_user, ROLE_PROJECT_MANAGER): return current_user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要项目负责人及以上权限", ) def require_labeler(current_user: User = Depends(get_current_user)) -> User: """Require labeler or above (excluding guest).""" if _is_admin(current_user): return current_user allowed = [ROLE_PROJECT_MANAGER, ROLE_LABELER, ROLE_REVIEWER] if _has_any_role(current_user, allowed): return current_user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="权限不足", ) def require_guest_or_above(current_user: User = Depends(get_current_user)) -> User: """Any authenticated user (including guest).""" return current_user