Files
2026-04-22 17:07:33 +08:00

269 lines
12 KiB
Python

from typing import Optional, List, Tuple
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.classification import Category, DataLevel, RecognitionRule, ClassificationTemplate
from app.schemas.classification import CategoryCreate, CategoryUpdate, RecognitionRuleCreate, RecognitionRuleUpdate
def get_category(db: Session, category_id: int) -> Optional[Category]:
return db.query(Category).filter(Category.id == category_id).first()
def list_categories(db: Session, parent_id: Optional[int] = None) -> List[Category]:
query = db.query(Category)
if parent_id is not None:
query = query.filter(Category.parent_id == parent_id)
return query.order_by(Category.sort_order).all()
def build_category_tree(db: Session) -> List[dict]:
def build_tree(parent_id: Optional[int]) -> List[dict]:
nodes = db.query(Category).filter(Category.parent_id == parent_id).order_by(Category.sort_order).all()
result = []
for node in nodes:
result.append({
"id": node.id,
"parent_id": node.parent_id,
"level": node.level,
"code": node.code,
"name": node.name,
"description": node.description,
"sort_order": node.sort_order,
"created_at": node.created_at,
"children": build_tree(node.id),
})
return result
return build_tree(None)
def create_category(db: Session, obj_in: CategoryCreate) -> Category:
db_obj = Category(**obj_in.model_dump())
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_category(db: Session, db_obj: Category, obj_in: CategoryUpdate) -> Category:
update_data = obj_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_category(db: Session, category_id: int) -> None:
db_obj = get_category(db, category_id)
if not db_obj:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="分类不存在")
# Check children
children = db.query(Category).filter(Category.parent_id == category_id).first()
if children:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="存在子分类,无法删除")
db.delete(db_obj)
db.commit()
def list_data_levels(db: Session) -> List[DataLevel]:
return db.query(DataLevel).order_by(DataLevel.sort_order).all()
def get_data_level(db: Session, level_id: int) -> Optional[DataLevel]:
return db.query(DataLevel).filter(DataLevel.id == level_id).first()
def create_data_level(db: Session, code: str, name: str, description: str, color: str, sort_order: int = 0, control_requirements: Optional[dict] = None) -> DataLevel:
db_obj = DataLevel(code=code, name=name, description=description, color=color, sort_order=sort_order, control_requirements=control_requirements)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_rule(db: Session, rule_id: int) -> Optional[RecognitionRule]:
return db.query(RecognitionRule).filter(RecognitionRule.id == rule_id).first()
def list_rules(db: Session, template_id: Optional[int] = None, keyword: Optional[str] = None, page: int = 1, page_size: int = 20) -> Tuple[List[RecognitionRule], int]:
query = db.query(RecognitionRule)
if template_id:
query = query.filter(RecognitionRule.template_id == template_id)
if keyword:
query = query.filter(
(RecognitionRule.rule_name.contains(keyword)) | (RecognitionRule.rule_content.contains(keyword))
)
total = query.count()
items = query.offset((page - 1) * page_size).limit(page_size).all()
return items, total
def create_rule(db: Session, obj_in: RecognitionRuleCreate) -> RecognitionRule:
db_obj = RecognitionRule(**obj_in.model_dump())
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_rule(db: Session, db_obj: RecognitionRule, obj_in: RecognitionRuleUpdate) -> RecognitionRule:
update_data = obj_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_rule(db: Session, rule_id: int) -> None:
db_obj = get_rule(db, rule_id)
if not db_obj:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="规则不存在")
db.delete(db_obj)
db.commit()
def get_template(db: Session, template_id: int) -> Optional[ClassificationTemplate]:
return db.query(ClassificationTemplate).filter(ClassificationTemplate.id == template_id).first()
def list_templates(db: Session) -> List[ClassificationTemplate]:
return db.query(ClassificationTemplate).order_by(ClassificationTemplate.id).all()
def init_builtin_data(db: Session):
# Data Levels
if not db.query(DataLevel).first():
levels = [
("L1", "公开级", "可对外公开发布", "#67c23a", 1, {"storage": "无特殊要求", "access": "公开访问"}),
("L2", "内部级", "公司内部共享使用", "#409eff", 2, {"storage": "内部环境", "access": "内部员工"}),
("L3", "敏感级", "部门/授权人员访问,外部分享需审批", "#e6a23c", 3, {"storage": "加密存储", "access": "授权访问"}),
("L4", "重要级", "严格授权管理,外部分享需严格审批", "#f56c6c", 4, {"storage": "强加密", "access": "最小权限"}),
("L5", "核心级", "禁止对外共享", "#909399", 5, {"storage": "物理隔离", "access": "核心人员"}),
]
for code, name, desc, color, sort, ctrl in levels:
create_data_level(db, code, name, desc, color, sort, ctrl)
# Categories
if not db.query(Category).first():
categories = [
# Level 1
{"code": "CUST", "name": "客户数据", "level": 1, "sort_order": 1},
{"code": "POLICY", "name": "保单数据", "level": 1, "sort_order": 2},
{"code": "CLAIM", "name": "理赔数据", "level": 1, "sort_order": 3},
{"code": "FIN", "name": "财务数据", "level": 1, "sort_order": 4},
{"code": "CHANNEL", "name": "渠道数据", "level": 1, "sort_order": 5},
{"code": "REG", "name": "监管报送数据", "level": 1, "sort_order": 6},
{"code": "INTERNAL", "name": "内部管理数据", "level": 1, "sort_order": 7},
{"code": "SUBJECT", "name": "车辆/财产标的数据", "level": 1, "sort_order": 8},
]
cat_map = {}
for c in categories:
obj = Category(parent_id=None, level=c["level"], code=c["code"], name=c["name"], sort_order=c["sort_order"])
db.add(obj)
db.commit()
db.refresh(obj)
cat_map[c["code"]] = obj.id
# Level 2
sub_categories = [
{"parent_code": "CUST", "code": "CUST_PERSONAL", "name": "个人客户信息", "sort_order": 1},
{"parent_code": "CUST", "code": "CUST_ENTERPRISE", "name": "企业客户信息", "sort_order": 2},
{"parent_code": "CUST", "code": "CUST_BENEFICIARY", "name": "受益人信息", "sort_order": 3},
{"parent_code": "POLICY", "code": "POLICY_APPLY", "name": "投保信息", "sort_order": 1},
{"parent_code": "POLICY", "code": "POLICY_UNDERWRITE", "name": "承保信息", "sort_order": 2},
{"parent_code": "POLICY", "code": "POLICY_RENEW", "name": "续保信息", "sort_order": 3},
{"parent_code": "CLAIM", "code": "CLAIM_REPORT", "name": "报案信息", "sort_order": 1},
{"parent_code": "CLAIM", "code": "CLAIM_SURVEY", "name": "查勘定损信息", "sort_order": 2},
{"parent_code": "CLAIM", "code": "CLAIM_PAY", "name": "赔付信息", "sort_order": 3},
{"parent_code": "FIN", "code": "FIN_PAYMENT", "name": "收付费数据", "sort_order": 1},
{"parent_code": "FIN", "code": "FIN_RESERVE", "name": "准备金数据", "sort_order": 2},
{"parent_code": "FIN", "code": "FIN_INVEST", "name": "投资数据", "sort_order": 3},
{"parent_code": "CHANNEL", "code": "CHN_AGENT", "name": "代理人/经纪人信息", "sort_order": 1},
{"parent_code": "CHANNEL", "code": "CHN_PARTNER", "name": "第三方合作方", "sort_order": 2},
{"parent_code": "REG", "code": "REG_SOLVENCY", "name": "偿付能力数据", "sort_order": 1},
{"parent_code": "REG", "code": "REG_STAT", "name": "统计报表数据", "sort_order": 2},
{"parent_code": "INTERNAL", "code": "INT_EMPLOYEE", "name": "员工信息", "sort_order": 1},
{"parent_code": "INTERNAL", "code": "INT_OPS", "name": "系统运维数据", "sort_order": 2},
{"parent_code": "SUBJECT", "code": "SUB_VEHICLE", "name": "车辆信息", "sort_order": 1},
{"parent_code": "SUBJECT", "code": "SUB_PROPERTY", "name": "财产标的", "sort_order": 2},
]
for sc in sub_categories:
parent_id = cat_map.get(sc["parent_code"])
if parent_id:
obj = Category(parent_id=parent_id, level=2, code=sc["code"], name=sc["name"], sort_order=sc["sort_order"])
db.add(obj)
db.commit()
# Template
if not db.query(ClassificationTemplate).first():
tpl = ClassificationTemplate(
name="财产保险行业分类分级模板",
industry_type="insurance_property",
version="1.0",
description="基于《金融数据安全 数据安全分级指南》及保险行业特点制定的分类分级模板",
is_builtin=True,
is_active=True,
)
db.add(tpl)
db.commit()
db.refresh(tpl)
# Create some sample rules
level_l4 = db.query(DataLevel).filter(DataLevel.code == "L4").first()
level_l3 = db.query(DataLevel).filter(DataLevel.code == "L3").first()
level_l5 = db.query(DataLevel).filter(DataLevel.code == "L5").first()
cat_cust_personal = db.query(Category).filter(Category.code == "CUST_PERSONAL").first()
cat_fin_reserve = db.query(Category).filter(Category.code == "FIN_RESERVE").first()
cat_int_ops = db.query(Category).filter(Category.code == "INT_OPS").first()
rules = []
if cat_cust_personal and level_l4:
rules.append(RecognitionRule(
template_id=tpl.id,
category_id=cat_cust_personal.id,
level_id=level_l4.id,
rule_type="regex",
rule_name="身份证号识别",
rule_content=r"(\d{15}|\d{18}|\d{17}[xX])",
target_field="sample_data",
priority=10,
))
rules.append(RecognitionRule(
template_id=tpl.id,
category_id=cat_cust_personal.id,
level_id=level_l4.id,
rule_type="keyword",
rule_name="手机号字段识别",
rule_content="手机,mobile,phone,telephone,tel",
target_field="column_name",
priority=20,
))
if cat_fin_reserve and level_l5:
rules.append(RecognitionRule(
template_id=tpl.id,
category_id=cat_fin_reserve.id,
level_id=level_l5.id,
rule_type="keyword",
rule_name="精算模型识别",
rule_content="精算,actuarial,准备金,reserve,偿付能力,solvency",
target_field="column_name",
priority=10,
))
if cat_int_ops and level_l5:
rules.append(RecognitionRule(
template_id=tpl.id,
category_id=cat_int_ops.id,
level_id=level_l5.id,
rule_type="keyword",
rule_name="密码密钥识别",
rule_content="password,secret,key,token,密钥,密码",
target_field="column_name",
priority=5,
))
for r in rules:
db.add(r)
db.commit()