from typing import Optional, List from sqlalchemy.orm import Session from fastapi import HTTPException, status from app.models.user import User, Role, Dept, UserRole from app.schemas.user import UserCreate, UserUpdate from app.core.security import get_password_hash def get_user_by_id(db: Session, user_id: int) -> Optional[User]: return db.query(User).filter(User.id == user_id).first() def get_user_by_username(db: Session, username: str) -> Optional[User]: return db.query(User).filter(User.username == username).first() def create_user(db: Session, obj_in: UserCreate) -> User: if get_user_by_username(db, obj_in.username): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在") db_obj = User( username=obj_in.username, email=obj_in.email, hashed_password=get_password_hash(obj_in.password), real_name=obj_in.real_name, phone=obj_in.phone, dept_id=obj_in.dept_id, is_active=obj_in.is_active, ) db.add(db_obj) db.commit() db.refresh(db_obj) if obj_in.role_ids: for rid in obj_in.role_ids: role = db.query(Role).filter(Role.id == rid).first() if role: db.add(UserRole(user_id=db_obj.id, role_id=rid)) db.commit() db.refresh(db_obj) return db_obj def update_user(db: Session, db_obj: User, obj_in: UserUpdate) -> User: update_data = obj_in.model_dump(exclude_unset=True) role_ids = update_data.pop("role_ids", None) for field, value in update_data.items(): setattr(db_obj, field, value) if role_ids is not None: db.query(UserRole).filter(UserRole.user_id == db_obj.id).delete() for rid in role_ids: role = db.query(Role).filter(Role.id == rid).first() if role: db.add(UserRole(user_id=db_obj.id, role_id=rid)) db.commit() db.refresh(db_obj) return db_obj def delete_user(db: Session, user_id: int) -> None: user = get_user_by_id(db, user_id) if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在") if user.is_superuser: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="不能删除超级管理员") db.delete(user) db.commit() def list_users(db: Session, keyword: Optional[str] = None, page: int = 1, page_size: int = 20): query = db.query(User) if keyword: query = query.filter( (User.username.contains(keyword)) | (User.real_name.contains(keyword)) | (User.email.contains(keyword)) ) total = query.count() items = query.offset((page - 1) * page_size).limit(page_size).all() return items, total def create_initial_data(db: Session): # Create default roles default_roles = [ {"name": "超级管理员", "code": "superadmin", "description": "系统超级管理员"}, {"name": "管理员", "code": "admin", "description": "系统管理员"}, {"name": "项目负责人", "code": "project_manager", "description": "分类分级项目负责人"}, {"name": "打标员", "code": "labeler", "description": "数据打标人员"}, {"name": "审核员", "code": "reviewer", "description": "结果审核人员"}, {"name": "访客", "code": "guest", "description": "只读访客"}, ] for r in default_roles: if not db.query(Role).filter(Role.code == r["code"]).first(): db.add(Role(**r)) # Create root dept if not db.query(Dept).filter(Dept.id == 1).first(): db.add(Dept(id=1, name="根部门", parent_id=None, sort_order=0)) db.commit() # Create superuser from app.core.config import settings if not get_user_by_username(db, settings.FIRST_SUPERUSER_USERNAME): superuser = User( username=settings.FIRST_SUPERUSER_USERNAME, email=settings.FIRST_SUPERUSER_EMAIL, hashed_password=get_password_hash(settings.FIRST_SUPERUSER_PASSWORD), real_name="超级管理员", is_active=True, is_superuser=True, dept_id=1, ) db.add(superuser) db.commit() db.refresh(superuser) superadmin_role = db.query(Role).filter(Role.code == "superadmin").first() if superadmin_role: db.add(UserRole(user_id=superuser.id, role_id=superadmin_role.id)) db.commit()