Files
hiderfong ad2f49de11 fix(login): 修复登录后跳转及认证拦截问题
- request.ts: 优化401处理,避免登录接口误判过期
- router/index.ts: 路由守卫增加用户信息获取
- stores/user.ts: fetchUserInfo增强错误处理,login前先清理旧状态
- Login.vue: 使用await router.push,避免重复报错
- user_service.py: bootstrap superuser密码同步
2026-04-26 07:59:46 +08:00

150 lines
5.3 KiB
Python

from typing import Optional, List
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.user import User, Role, Dept, UserRole
from app.schemas.user import UserCreate, UserUpdate
from app.core.security import get_password_hash, verify_password
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
return db.query(User).filter(User.id == user_id).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
return db.query(User).filter(User.username == username).first()
def create_user(db: Session, obj_in: UserCreate) -> User:
if get_user_by_username(db, obj_in.username):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在")
db_obj = User(
username=obj_in.username,
email=obj_in.email,
hashed_password=get_password_hash(obj_in.password),
real_name=obj_in.real_name,
phone=obj_in.phone,
dept_id=obj_in.dept_id,
is_active=obj_in.is_active,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
if obj_in.role_ids:
for rid in obj_in.role_ids:
role = db.query(Role).filter(Role.id == rid).first()
if role:
db.add(UserRole(user_id=db_obj.id, role_id=rid))
db.commit()
db.refresh(db_obj)
return db_obj
def update_user(db: Session, db_obj: User, obj_in: UserUpdate) -> User:
update_data = obj_in.model_dump(exclude_unset=True)
role_ids = update_data.pop("role_ids", None)
for field, value in update_data.items():
setattr(db_obj, field, value)
if role_ids is not None:
db.query(UserRole).filter(UserRole.user_id == db_obj.id).delete()
for rid in role_ids:
role = db.query(Role).filter(Role.id == rid).first()
if role:
db.add(UserRole(user_id=db_obj.id, role_id=rid))
db.commit()
db.refresh(db_obj)
return db_obj
def delete_user(db: Session, user_id: int) -> None:
user = get_user_by_id(db, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在")
if user.is_superuser:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="不能删除超级管理员")
db.delete(user)
db.commit()
def list_users(db: Session, keyword: Optional[str] = None, page: int = 1, page_size: int = 20):
query = db.query(User)
if keyword:
query = query.filter(
(User.username.contains(keyword))
| (User.real_name.contains(keyword))
| (User.email.contains(keyword))
)
total = query.count()
items = query.offset((page - 1) * page_size).limit(page_size).all()
return items, total
def create_initial_data(db: Session):
# Create default roles
default_roles = [
{"name": "超级管理员", "code": "superadmin", "description": "系统超级管理员"},
{"name": "管理员", "code": "admin", "description": "系统管理员"},
{"name": "项目负责人", "code": "project_manager", "description": "分类分级项目负责人"},
{"name": "打标员", "code": "labeler", "description": "数据打标人员"},
{"name": "审核员", "code": "reviewer", "description": "结果审核人员"},
{"name": "访客", "code": "guest", "description": "只读访客"},
]
for r in default_roles:
if not db.query(Role).filter(Role.code == r["code"]).first():
db.add(Role(**r))
# Create root dept
if not db.query(Dept).filter(Dept.id == 1).first():
db.add(Dept(id=1, name="根部门", parent_id=None, sort_order=0))
db.commit()
# Create or sync the configured bootstrap superuser.
from app.core.config import settings
superuser = get_user_by_username(db, settings.FIRST_SUPERUSER_USERNAME)
if not superuser:
superuser = User(
username=settings.FIRST_SUPERUSER_USERNAME,
email=settings.FIRST_SUPERUSER_EMAIL,
hashed_password=get_password_hash(settings.FIRST_SUPERUSER_PASSWORD),
real_name="超级管理员",
is_active=True,
is_superuser=True,
dept_id=1,
)
db.add(superuser)
db.commit()
db.refresh(superuser)
else:
changed = False
if not verify_password(settings.FIRST_SUPERUSER_PASSWORD, superuser.hashed_password):
superuser.hashed_password = get_password_hash(settings.FIRST_SUPERUSER_PASSWORD)
changed = True
if superuser.email != settings.FIRST_SUPERUSER_EMAIL:
superuser.email = settings.FIRST_SUPERUSER_EMAIL
changed = True
if not superuser.is_active:
superuser.is_active = True
changed = True
if not superuser.is_superuser:
superuser.is_superuser = True
changed = True
if superuser.dept_id is None:
superuser.dept_id = 1
changed = True
if changed:
db.commit()
db.refresh(superuser)
superadmin_role = db.query(Role).filter(Role.code == "superadmin").first()
if superadmin_role and superadmin_role not in superuser.roles:
db.add(UserRole(user_id=superuser.id, role_id=superadmin_role.id))
db.commit()