Files
hiderfong 6d70520e79 feat: 全量功能模块开发与集成测试修复
- 新增后端模块:Alert、APIAsset、Compliance、Lineage、Masking、Risk、SchemaChange、Unstructured、Watermark
- 新增前端模块页面与API接口
- 新增Alembic迁移脚本(002-014)覆盖全量业务表
- 新增测试数据生成脚本与集成测试脚本
- 修复metadata模型JSON类型导入缺失导致启动失败的问题
- 修复前端Alert/APIAsset页面request模块路径错误
- 更新docker-compose与开发计划文档
2026-04-25 08:51:38 +08:00

100 lines
4.4 KiB
Python

import os
import re
import json
from typing import Optional, List
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.metadata import UnstructuredFile
from app.core.events import minio_client
from app.core.config import settings
def extract_text_from_file(file_path: str, file_type: str) -> str:
text = ""
ft = file_type.lower()
if ft in ("word", "docx"):
try:
from docx import Document
doc = Document(file_path)
text = "\n".join([p.text for p in doc.paragraphs if p.text])
except Exception as e:
raise ValueError(f"解析Word失败: {e}")
elif ft in ("excel", "xlsx", "xls"):
try:
from openpyxl import load_workbook
wb = load_workbook(file_path, data_only=True)
parts = []
for sheet in wb.worksheets:
for row in sheet.iter_rows(values_only=True):
parts.append(" ".join([str(c) for c in row if c is not None]))
text = "\n".join(parts)
except Exception as e:
raise ValueError(f"解析Excel失败: {e}")
elif ft == "pdf":
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text = "\n".join([page.extract_text() or "" for page in pdf.pages])
except Exception as e:
raise ValueError(f"解析PDF失败: {e}")
elif ft == "txt":
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
else:
raise ValueError(f"不支持的文件类型: {ft}")
return text
def scan_text_for_sensitive(text: str) -> List[dict]:
"""Scan extracted text for sensitive patterns using built-in rules."""
matches = []
# ID card
id_pattern = re.compile(r"(?<!\d)\d{17}[\dXx](?!\d)")
for m in id_pattern.finditer(text):
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
matches.append({"rule_name": "身份证号", "category_code": "CUST_PERSONAL", "level_code": "L4", "snippet": snippet, "position": m.start()})
# Phone
phone_pattern = re.compile(r"(?<!\d)1[3-9]\d{9}(?!\d)")
for m in phone_pattern.finditer(text):
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
matches.append({"rule_name": "手机号", "category_code": "CUST_PERSONAL", "level_code": "L4", "snippet": snippet, "position": m.start()})
# Bank card (simple 16-19 digits)
bank_pattern = re.compile(r"(?<!\d)\d{16,19}(?!\d)")
for m in bank_pattern.finditer(text):
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
matches.append({"rule_name": "银行卡号", "category_code": "FIN_PAYMENT", "level_code": "L4", "snippet": snippet, "position": m.start()})
# Amount
amount_pattern = re.compile(r"(?<!\d)\d{1,3}(,\d{3})*\.\d{2}(?!\d)")
for m in amount_pattern.finditer(text):
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
matches.append({"rule_name": "金额", "category_code": "FIN_PAYMENT", "level_code": "L3", "snippet": snippet, "position": m.start()})
return matches
def process_unstructured_file(db: Session, file_id: int) -> dict:
file_obj = db.query(UnstructuredFile).filter(UnstructuredFile.id == file_id).first()
if not file_obj:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件不存在")
if not file_obj.storage_path:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件未上传")
# Download from MinIO to temp
tmp_path = f"/tmp/unstructured_{file_id}_{file_obj.original_name}"
try:
minio_client.fget_object(settings.MINIO_BUCKET_NAME, file_obj.storage_path, tmp_path)
text = extract_text_from_file(tmp_path, file_obj.file_type or "")
file_obj.extracted_text = text[:50000] # limit storage
matches = scan_text_for_sensitive(text)
file_obj.analysis_result = {"matches": matches, "total_chars": len(text)}
file_obj.status = "processed"
db.commit()
return {"success": True, "matches": matches, "total_chars": len(text)}
except Exception as e:
file_obj.status = "error"
db.commit()
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)