Files
prop-data-guard/backend/app/api/v1/user.py
T
2026-04-22 17:07:33 +08:00

65 lines
2.1 KiB
Python

from typing import Optional, List
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate, UserOut
from app.schemas.common import ResponseModel, ListResponse, PageParams
from app.services import user_service
from app.api.deps import get_current_user
router = APIRouter()
@router.get("/me", response_model=ResponseModel[UserOut])
def read_me(current_user: User = Depends(get_current_user)):
return ResponseModel(data=UserOut.model_validate(current_user))
@router.get("", response_model=ListResponse[UserOut])
def list_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=500),
keyword: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
items, total = user_service.list_users(db, keyword=keyword, page=page, page_size=page_size)
return ListResponse(data=[UserOut.model_validate(u) for u in items], total=total, page=page, page_size=page_size)
@router.post("", response_model=ResponseModel[UserOut])
def create_user(
req: UserCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
user = user_service.create_user(db, req)
return ResponseModel(data=UserOut.model_validate(user))
@router.put("/{user_id}", response_model=ResponseModel[UserOut])
def update_user(
user_id: int,
req: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
user = user_service.get_user_by_id(db, user_id)
if not user:
from fastapi import HTTPException, status
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="用户不存在")
user = user_service.update_user(db, user, req)
return ResponseModel(data=UserOut.model_validate(user))
@router.delete("/{user_id}")
def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
user_service.delete_user(db, user_id)
return ResponseModel(message="删除成功")