6d70520e79
- 新增后端模块:Alert、APIAsset、Compliance、Lineage、Masking、Risk、SchemaChange、Unstructured、Watermark - 新增前端模块页面与API接口 - 新增Alembic迁移脚本(002-014)覆盖全量业务表 - 新增测试数据生成脚本与集成测试脚本 - 修复metadata模型JSON类型导入缺失导致启动失败的问题 - 修复前端Alert/APIAsset页面request模块路径错误 - 更新docker-compose与开发计划文档
98 lines
3.0 KiB
Python
98 lines
3.0 KiB
Python
import secrets
|
|
from typing import Optional, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from app.models.watermark import WatermarkLog
|
|
|
|
# Zero-width characters for binary encoding
|
|
ZW_SPACE = "\u200b" # zero-width space -> 0
|
|
ZW_NOJOIN = "\u200c" # zero-width non-joiner -> 1
|
|
MARKER = "\u200d" # zero-width joiner -> start marker
|
|
|
|
|
|
def _int_to_binary_bits(n: int, bits: int = 32) -> str:
|
|
return format(n, f"0{bits}b")
|
|
|
|
|
|
def _binary_bits_to_int(bits: str) -> int:
|
|
return int(bits, 2)
|
|
|
|
|
|
def embed_watermark(text: str, user_id: int, key: str) -> str:
|
|
"""Embed invisible watermark into text using zero-width characters."""
|
|
# Encode user_id as 32-bit binary
|
|
bits = _int_to_binary_bits(user_id)
|
|
# Encode key hash as 16-bit for verification
|
|
key_bits = _int_to_binary_bits(hash(key) & 0xFFFF, 16)
|
|
payload = key_bits + bits
|
|
watermark_chars = MARKER + "".join(ZW_NOJOIN if b == "1" else ZW_SPACE for b in payload)
|
|
# Append watermark at the end of the text (before trailing newlines if any)
|
|
text = text.rstrip("\n")
|
|
return text + watermark_chars + "\n"
|
|
|
|
|
|
def extract_watermark(text: str) -> Tuple[Optional[int], Optional[str]]:
|
|
"""Extract watermark from text. Returns (user_id, key_hash_bits) or (None, None)."""
|
|
if MARKER not in text:
|
|
return None, None
|
|
idx = text.index(MARKER)
|
|
payload = text[idx + len(MARKER):]
|
|
bits = ""
|
|
for ch in payload:
|
|
if ch == ZW_SPACE:
|
|
bits += "0"
|
|
elif ch == ZW_NOJOIN:
|
|
bits += "1"
|
|
else:
|
|
# Stop at first non-watermark character
|
|
break
|
|
if len(bits) < 16:
|
|
return None, None
|
|
key_bits = bits[:16]
|
|
user_bits = bits[16:48]
|
|
try:
|
|
user_id = _binary_bits_to_int(user_bits)
|
|
return user_id, key_bits
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
def apply_watermark_to_lines(lines: list, user_id: int, key: str) -> list:
|
|
"""Apply watermark to each line of CSV/TXT."""
|
|
return [embed_watermark(line, user_id, key) for line in lines]
|
|
|
|
|
|
def create_watermark_log(db: Session, user_id: int, export_type: str, data_scope: dict) -> WatermarkLog:
|
|
key = secrets.token_hex(16)
|
|
log = WatermarkLog(
|
|
user_id=user_id,
|
|
export_type=export_type,
|
|
data_scope=str(data_scope),
|
|
watermark_key=key,
|
|
)
|
|
db.add(log)
|
|
db.commit()
|
|
db.refresh(log)
|
|
return log
|
|
|
|
|
|
def trace_watermark(db: Session, text: str) -> Optional[dict]:
|
|
"""Trace leaked text back to user."""
|
|
user_id, _ = extract_watermark(text)
|
|
if user_id is None:
|
|
return None
|
|
log = (
|
|
db.query(WatermarkLog)
|
|
.filter(WatermarkLog.user_id == user_id)
|
|
.order_by(WatermarkLog.created_at.desc())
|
|
.first()
|
|
)
|
|
if not log:
|
|
return None
|
|
return {
|
|
"user_id": log.user_id,
|
|
"username": log.user.username if log.user else None,
|
|
"export_type": log.export_type,
|
|
"data_scope": log.data_scope,
|
|
"created_at": log.created_at.isoformat() if log.created_at else None,
|
|
}
|