feat: implement full RBAC role-based access control
Backend: - deps.py: add require_admin, require_manager, require_labeler, require_guest_or_above - user.py: all write endpoints require admin - datasource.py: write/sync endpoints require admin - metadata.py: sync endpoint requires admin - classification.py: category/rule write requires admin; results query requires guest+ with data isolation - project.py: GET requires manager with created_by filtering; DELETE checks ownership - task.py: my-tasks requires labeler with assignee_id filtering; create-task requires manager - dashboard.py: requires guest_or_above - report.py: requires guest_or_above - project_service: list_projects adds created_by filter; list_results adds project_ids filter Frontend: - stores/user.ts: add hasRole, hasAnyRole, isAdmin, isManager, isLabeler, isSuperadmin - router/index.ts: add roles to route meta; beforeEach checks role permissions - Layout.vue: filter menu routes by user roles - System.vue: hide add/edit/delete buttons for non-admins - DataSource.vue: hide add/edit/delete/sync buttons for non-admins - Project.vue: hide add/delete buttons for non-admins
This commit is contained in:
+83
-1
@@ -4,7 +4,7 @@ from jose import JWTError
|
||||
|
||||
from app.core.database import get_db
|
||||
from app.core.security import decode_token
|
||||
from app.models.user import User
|
||||
from app.models.user import User, Role
|
||||
from app.services import user_service
|
||||
|
||||
|
||||
@@ -47,3 +47,85 @@ def get_current_active_user(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> User:
|
||||
return current_user
|
||||
|
||||
|
||||
# ===================== RBAC Dependencies =====================
|
||||
|
||||
# Role code constants
|
||||
ROLE_SUPERADMIN = "superadmin"
|
||||
ROLE_ADMIN = "admin"
|
||||
ROLE_PROJECT_MANAGER = "project_manager"
|
||||
ROLE_LABELER = "labeler"
|
||||
ROLE_REVIEWER = "reviewer"
|
||||
ROLE_GUEST = "guest"
|
||||
|
||||
# Role hierarchy (higher index = more permissions)
|
||||
ROLE_LEVELS = {
|
||||
ROLE_GUEST: 0,
|
||||
ROLE_LABELER: 1,
|
||||
ROLE_REVIEWER: 1,
|
||||
ROLE_PROJECT_MANAGER: 2,
|
||||
ROLE_ADMIN: 3,
|
||||
ROLE_SUPERADMIN: 4,
|
||||
}
|
||||
|
||||
|
||||
def _get_user_role_codes(user: User) -> list[str]:
|
||||
"""Get list of role codes for a user."""
|
||||
return [r.code for r in user.roles]
|
||||
|
||||
|
||||
def _has_role(user: User, role_code: str) -> bool:
|
||||
"""Check if user has a specific role."""
|
||||
return role_code in _get_user_role_codes(user)
|
||||
|
||||
|
||||
def _has_any_role(user: User, role_codes: list[str]) -> bool:
|
||||
"""Check if user has any of the specified roles."""
|
||||
user_roles = _get_user_role_codes(user)
|
||||
return any(r in user_roles for r in role_codes)
|
||||
|
||||
|
||||
def _is_admin(user: User) -> bool:
|
||||
"""Check if user is admin or superadmin."""
|
||||
return user.is_superuser or _has_any_role(user, [ROLE_SUPERADMIN, ROLE_ADMIN])
|
||||
|
||||
|
||||
def require_admin(current_user: User = Depends(get_current_user)) -> User:
|
||||
"""Require admin or superadmin role."""
|
||||
if not _is_admin(current_user):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="需要管理员权限",
|
||||
)
|
||||
return current_user
|
||||
|
||||
|
||||
def require_manager(current_user: User = Depends(get_current_user)) -> User:
|
||||
"""Require project manager or above."""
|
||||
if _is_admin(current_user):
|
||||
return current_user
|
||||
if _has_role(current_user, ROLE_PROJECT_MANAGER):
|
||||
return current_user
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="需要项目负责人及以上权限",
|
||||
)
|
||||
|
||||
|
||||
def require_labeler(current_user: User = Depends(get_current_user)) -> User:
|
||||
"""Require labeler or above (excluding guest)."""
|
||||
if _is_admin(current_user):
|
||||
return current_user
|
||||
allowed = [ROLE_PROJECT_MANAGER, ROLE_LABELER, ROLE_REVIEWER]
|
||||
if _has_any_role(current_user, allowed):
|
||||
return current_user
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="权限不足",
|
||||
)
|
||||
|
||||
|
||||
def require_guest_or_above(current_user: User = Depends(get_current_user)) -> User:
|
||||
"""Any authenticated user (including guest)."""
|
||||
return current_user
|
||||
|
||||
Reference in New Issue
Block a user