6d70520e79
- 新增后端模块:Alert、APIAsset、Compliance、Lineage、Masking、Risk、SchemaChange、Unstructured、Watermark - 新增前端模块页面与API接口 - 新增Alembic迁移脚本(002-014)覆盖全量业务表 - 新增测试数据生成脚本与集成测试脚本 - 修复metadata模型JSON类型导入缺失导致启动失败的问题 - 修复前端Alert/APIAsset页面request模块路径错误 - 更新docker-compose与开发计划文档
100 lines
4.4 KiB
Python
100 lines
4.4 KiB
Python
import os
|
|
import re
|
|
import json
|
|
from typing import Optional, List
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import HTTPException, status
|
|
|
|
from app.models.metadata import UnstructuredFile
|
|
from app.core.events import minio_client
|
|
from app.core.config import settings
|
|
|
|
|
|
def extract_text_from_file(file_path: str, file_type: str) -> str:
|
|
text = ""
|
|
ft = file_type.lower()
|
|
if ft in ("word", "docx"):
|
|
try:
|
|
from docx import Document
|
|
doc = Document(file_path)
|
|
text = "\n".join([p.text for p in doc.paragraphs if p.text])
|
|
except Exception as e:
|
|
raise ValueError(f"解析Word失败: {e}")
|
|
elif ft in ("excel", "xlsx", "xls"):
|
|
try:
|
|
from openpyxl import load_workbook
|
|
wb = load_workbook(file_path, data_only=True)
|
|
parts = []
|
|
for sheet in wb.worksheets:
|
|
for row in sheet.iter_rows(values_only=True):
|
|
parts.append(" ".join([str(c) for c in row if c is not None]))
|
|
text = "\n".join(parts)
|
|
except Exception as e:
|
|
raise ValueError(f"解析Excel失败: {e}")
|
|
elif ft == "pdf":
|
|
try:
|
|
import pdfplumber
|
|
with pdfplumber.open(file_path) as pdf:
|
|
text = "\n".join([page.extract_text() or "" for page in pdf.pages])
|
|
except Exception as e:
|
|
raise ValueError(f"解析PDF失败: {e}")
|
|
elif ft == "txt":
|
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
text = f.read()
|
|
else:
|
|
raise ValueError(f"不支持的文件类型: {ft}")
|
|
return text
|
|
|
|
|
|
def scan_text_for_sensitive(text: str) -> List[dict]:
|
|
"""Scan extracted text for sensitive patterns using built-in rules."""
|
|
matches = []
|
|
# ID card
|
|
id_pattern = re.compile(r"(?<!\d)\d{17}[\dXx](?!\d)")
|
|
for m in id_pattern.finditer(text):
|
|
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
|
|
matches.append({"rule_name": "身份证号", "category_code": "CUST_PERSONAL", "level_code": "L4", "snippet": snippet, "position": m.start()})
|
|
# Phone
|
|
phone_pattern = re.compile(r"(?<!\d)1[3-9]\d{9}(?!\d)")
|
|
for m in phone_pattern.finditer(text):
|
|
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
|
|
matches.append({"rule_name": "手机号", "category_code": "CUST_PERSONAL", "level_code": "L4", "snippet": snippet, "position": m.start()})
|
|
# Bank card (simple 16-19 digits)
|
|
bank_pattern = re.compile(r"(?<!\d)\d{16,19}(?!\d)")
|
|
for m in bank_pattern.finditer(text):
|
|
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
|
|
matches.append({"rule_name": "银行卡号", "category_code": "FIN_PAYMENT", "level_code": "L4", "snippet": snippet, "position": m.start()})
|
|
# Amount
|
|
amount_pattern = re.compile(r"(?<!\d)\d{1,3}(,\d{3})*\.\d{2}(?!\d)")
|
|
for m in amount_pattern.finditer(text):
|
|
snippet = text[max(0, m.start()-10):min(len(text), m.end()+10)]
|
|
matches.append({"rule_name": "金额", "category_code": "FIN_PAYMENT", "level_code": "L3", "snippet": snippet, "position": m.start()})
|
|
return matches
|
|
|
|
|
|
def process_unstructured_file(db: Session, file_id: int) -> dict:
|
|
file_obj = db.query(UnstructuredFile).filter(UnstructuredFile.id == file_id).first()
|
|
if not file_obj:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件不存在")
|
|
if not file_obj.storage_path:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件未上传")
|
|
|
|
# Download from MinIO to temp
|
|
tmp_path = f"/tmp/unstructured_{file_id}_{file_obj.original_name}"
|
|
try:
|
|
minio_client.fget_object(settings.MINIO_BUCKET_NAME, file_obj.storage_path, tmp_path)
|
|
text = extract_text_from_file(tmp_path, file_obj.file_type or "")
|
|
file_obj.extracted_text = text[:50000] # limit storage
|
|
matches = scan_text_for_sensitive(text)
|
|
file_obj.analysis_result = {"matches": matches, "total_chars": len(text)}
|
|
file_obj.status = "processed"
|
|
db.commit()
|
|
return {"success": True, "matches": matches, "total_chars": len(text)}
|
|
except Exception as e:
|
|
file_obj.status = "error"
|
|
db.commit()
|
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|
|
finally:
|
|
if os.path.exists(tmp_path):
|
|
os.remove(tmp_path)
|