97 lines
3.6 KiB
Python
97 lines
3.6 KiB
Python
from io import BytesIO
|
||
from typing import Optional
|
||
from sqlalchemy.orm import Session
|
||
from datetime import datetime
|
||
|
||
from docx import Document
|
||
from docx.shared import Inches, Pt, RGBColor
|
||
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
||
|
||
from app.models.project import ClassificationProject, ClassificationResult
|
||
from app.models.classification import Category, DataLevel
|
||
|
||
|
||
def generate_classification_report(db: Session, project_id: int) -> bytes:
|
||
"""Generate a Word report for a classification project."""
|
||
project = db.query(ClassificationProject).filter(ClassificationProject.id == project_id).first()
|
||
if not project:
|
||
raise ValueError("项目不存在")
|
||
|
||
doc = Document()
|
||
|
||
# Title
|
||
title = doc.add_heading('数据分类分级项目报告', 0)
|
||
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
||
|
||
# Basic info
|
||
doc.add_heading('一、项目基本信息', level=1)
|
||
info_table = doc.add_table(rows=4, cols=2)
|
||
info_table.style = 'Light Grid Accent 1'
|
||
info_data = [
|
||
('项目名称', project.name),
|
||
('报告生成时间', datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
|
||
('项目状态', project.status),
|
||
('模板版本', project.template.version if project.template else 'N/A'),
|
||
]
|
||
for i, (k, v) in enumerate(info_data):
|
||
info_table.rows[i].cells[0].text = k
|
||
info_table.rows[i].cells[1].text = str(v)
|
||
|
||
# Statistics
|
||
doc.add_heading('二、分类分级统计', level=1)
|
||
results = db.query(ClassificationResult).filter(ClassificationResult.project_id == project_id).all()
|
||
|
||
total = len(results)
|
||
auto_count = sum(1 for r in results if r.source == 'auto')
|
||
manual_count = sum(1 for r in results if r.source == 'manual')
|
||
|
||
level_stats = {}
|
||
for r in results:
|
||
if r.level:
|
||
level_stats[r.level.name] = level_stats.get(r.level.name, 0) + 1
|
||
|
||
doc.add_paragraph(f'总字段数: {total}')
|
||
doc.add_paragraph(f'自动识别: {auto_count}')
|
||
doc.add_paragraph(f'人工打标: {manual_count}')
|
||
|
||
doc.add_heading('三、分级分布', level=1)
|
||
level_table = doc.add_table(rows=1, cols=3)
|
||
level_table.style = 'Light Grid Accent 1'
|
||
hdr_cells = level_table.rows[0].cells
|
||
hdr_cells[0].text = '分级'
|
||
hdr_cells[1].text = '数量'
|
||
hdr_cells[2].text = '占比'
|
||
for level_name, count in sorted(level_stats.items(), key=lambda x: -x[1]):
|
||
row_cells = level_table.add_row().cells
|
||
row_cells[0].text = level_name
|
||
row_cells[1].text = str(count)
|
||
row_cells[2].text = f'{count / total * 100:.1f}%' if total > 0 else '0%'
|
||
|
||
# High risk data
|
||
doc.add_heading('四、高敏感数据清单(L4/L5)', level=1)
|
||
high_risk = [r for r in results if r.level and r.level.code in ('L4', 'L5')]
|
||
if high_risk:
|
||
risk_table = doc.add_table(rows=1, cols=5)
|
||
risk_table.style = 'Light Grid Accent 1'
|
||
hdr = risk_table.rows[0].cells
|
||
hdr[0].text = '字段名'
|
||
hdr[1].text = '所属表'
|
||
hdr[2].text = '分类'
|
||
hdr[3].text = '分级'
|
||
hdr[4].text = '来源'
|
||
for r in high_risk[:100]: # limit to 100 rows
|
||
row = risk_table.add_row().cells
|
||
row[0].text = r.column.name if r.column else 'N/A'
|
||
row[1].text = r.column.table.name if r.column and r.column.table else 'N/A'
|
||
row[2].text = r.category.name if r.category else 'N/A'
|
||
row[3].text = r.level.name if r.level else 'N/A'
|
||
row[4].text = '自动' if r.source == 'auto' else '人工'
|
||
else:
|
||
doc.add_paragraph('暂无L4/L5级高敏感数据。')
|
||
|
||
# Save to bytes
|
||
buffer = BytesIO()
|
||
doc.save(buffer)
|
||
buffer.seek(0)
|
||
return buffer.read()
|