ad2f49de11
- request.ts: 优化401处理,避免登录接口误判过期 - router/index.ts: 路由守卫增加用户信息获取 - stores/user.ts: fetchUserInfo增强错误处理,login前先清理旧状态 - Login.vue: 使用await router.push,避免重复报错 - user_service.py: bootstrap superuser密码同步
150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
from typing import Optional, List
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException, status
|
|
|
|
from app.models.user import User, Role, Dept, UserRole
|
|
from app.schemas.user import UserCreate, UserUpdate
|
|
from app.core.security import get_password_hash, verify_password
|
|
|
|
|
|
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
|
|
return db.query(User).filter(User.id == user_id).first()
|
|
|
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]:
|
|
return db.query(User).filter(User.username == username).first()
|
|
|
|
|
|
def create_user(db: Session, obj_in: UserCreate) -> User:
|
|
if get_user_by_username(db, obj_in.username):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在")
|
|
|
|
db_obj = User(
|
|
username=obj_in.username,
|
|
email=obj_in.email,
|
|
hashed_password=get_password_hash(obj_in.password),
|
|
real_name=obj_in.real_name,
|
|
phone=obj_in.phone,
|
|
dept_id=obj_in.dept_id,
|
|
is_active=obj_in.is_active,
|
|
)
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
if obj_in.role_ids:
|
|
for rid in obj_in.role_ids:
|
|
role = db.query(Role).filter(Role.id == rid).first()
|
|
if role:
|
|
db.add(UserRole(user_id=db_obj.id, role_id=rid))
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
|
|
return db_obj
|
|
|
|
|
|
def update_user(db: Session, db_obj: User, obj_in: UserUpdate) -> User:
|
|
update_data = obj_in.model_dump(exclude_unset=True)
|
|
role_ids = update_data.pop("role_ids", None)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(db_obj, field, value)
|
|
|
|
if role_ids is not None:
|
|
db.query(UserRole).filter(UserRole.user_id == db_obj.id).delete()
|
|
for rid in role_ids:
|
|
role = db.query(Role).filter(Role.id == rid).first()
|
|
if role:
|
|
db.add(UserRole(user_id=db_obj.id, role_id=rid))
|
|
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
|
|
def delete_user(db: Session, user_id: int) -> None:
|
|
user = get_user_by_id(db, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在")
|
|
if user.is_superuser:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="不能删除超级管理员")
|
|
db.delete(user)
|
|
db.commit()
|
|
|
|
|
|
def list_users(db: Session, keyword: Optional[str] = None, page: int = 1, page_size: int = 20):
|
|
query = db.query(User)
|
|
if keyword:
|
|
query = query.filter(
|
|
(User.username.contains(keyword))
|
|
| (User.real_name.contains(keyword))
|
|
| (User.email.contains(keyword))
|
|
)
|
|
total = query.count()
|
|
items = query.offset((page - 1) * page_size).limit(page_size).all()
|
|
return items, total
|
|
|
|
|
|
def create_initial_data(db: Session):
|
|
# Create default roles
|
|
default_roles = [
|
|
{"name": "超级管理员", "code": "superadmin", "description": "系统超级管理员"},
|
|
{"name": "管理员", "code": "admin", "description": "系统管理员"},
|
|
{"name": "项目负责人", "code": "project_manager", "description": "分类分级项目负责人"},
|
|
{"name": "打标员", "code": "labeler", "description": "数据打标人员"},
|
|
{"name": "审核员", "code": "reviewer", "description": "结果审核人员"},
|
|
{"name": "访客", "code": "guest", "description": "只读访客"},
|
|
]
|
|
for r in default_roles:
|
|
if not db.query(Role).filter(Role.code == r["code"]).first():
|
|
db.add(Role(**r))
|
|
|
|
# Create root dept
|
|
if not db.query(Dept).filter(Dept.id == 1).first():
|
|
db.add(Dept(id=1, name="根部门", parent_id=None, sort_order=0))
|
|
|
|
db.commit()
|
|
|
|
# Create or sync the configured bootstrap superuser.
|
|
from app.core.config import settings
|
|
superuser = get_user_by_username(db, settings.FIRST_SUPERUSER_USERNAME)
|
|
if not superuser:
|
|
superuser = User(
|
|
username=settings.FIRST_SUPERUSER_USERNAME,
|
|
email=settings.FIRST_SUPERUSER_EMAIL,
|
|
hashed_password=get_password_hash(settings.FIRST_SUPERUSER_PASSWORD),
|
|
real_name="超级管理员",
|
|
is_active=True,
|
|
is_superuser=True,
|
|
dept_id=1,
|
|
)
|
|
db.add(superuser)
|
|
db.commit()
|
|
db.refresh(superuser)
|
|
|
|
else:
|
|
changed = False
|
|
if not verify_password(settings.FIRST_SUPERUSER_PASSWORD, superuser.hashed_password):
|
|
superuser.hashed_password = get_password_hash(settings.FIRST_SUPERUSER_PASSWORD)
|
|
changed = True
|
|
if superuser.email != settings.FIRST_SUPERUSER_EMAIL:
|
|
superuser.email = settings.FIRST_SUPERUSER_EMAIL
|
|
changed = True
|
|
if not superuser.is_active:
|
|
superuser.is_active = True
|
|
changed = True
|
|
if not superuser.is_superuser:
|
|
superuser.is_superuser = True
|
|
changed = True
|
|
if superuser.dept_id is None:
|
|
superuser.dept_id = 1
|
|
changed = True
|
|
if changed:
|
|
db.commit()
|
|
db.refresh(superuser)
|
|
|
|
superadmin_role = db.query(Role).filter(Role.code == "superadmin").first()
|
|
if superadmin_role and superadmin_role not in superuser.roles:
|
|
db.add(UserRole(user_id=superuser.id, role_id=superadmin_role.id))
|
|
db.commit()
|