当前位置: 首页 > news >正文

Python安全日志审计

"""
Python 安全日志审计 —— 企业级安全事件记录与完整性保护
涵盖敏感数据脱敏、不可变存储、日志完整性验证等核心功能
"""

# 安装依赖:pip install cryptography
# 安全审计日志记录所有安全相关事件,是事后分析和合规审计的基础

import json
import time
import hmac
import hashlib
import re
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime

# ========== 第一部分:敏感数据脱敏 ==========

class DataMasker:
"""
敏感数据脱敏器 —— 在记录日志前对敏感信息进行处理。
确保日志不会泄露密码、令牌、个人身份信息等敏感数据。
"""

# 敏感字段名称模式
SENSITIVE_FIELDS = {
"password", "passwd", "secret", "token", "access_token",
"refresh_token", "api_key", "apikey", "credit_card",
"ssn", "phone", "phone_number", "email",
"authorization", "cookie", "session_id"
}

# 信用卡号正则(简单匹配)
CC_PATTERN = re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b")

# 电子邮件正则
EMAIL_PATTERN = re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b")

# IP 地址正则(用于模糊化处理)
IP_PATTERN = re.compile(r"\b(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})\b")

@classmethod
def mask_value(cls, key: str, value: Any) -> Any:
"""
根据字段名对值进行脱敏处理。
"""
key_lower = key.lower()

# 检查是否为敏感字段
if any(sensitive in key_lower for sensitive in cls.SENSITIVE_FIELDS):
if isinstance(value, str) and len(value) > 4:
return value[:2] + "****" + value[-2:]
return "****"

# 递归处理字典和列表
if isinstance(value, dict):
return {k: cls.mask_value(k, v) for k, v in value.items()}
if isinstance(value, list):
return [cls.mask_value(key, item) for item in value]
return value

@classmethod
def mask_text(cls, text: str) -> str:
"""
对自由文本中的敏感信息进行脱敏。
"""
# 脱敏信用卡号
text = cls.CC_PATTERN.sub("****-****-****-****", text)
# 脱敏电子邮件(保留域名)
text = cls.EMAIL_PATTERN.sub(lambda m: m.group()[0] + "***@" + m.group().split("@")[1], text)
# 模糊化 IP 地址
text = cls.IP_PATTERN.sub(r"\1.\2.\3.xxx", text)
return text


# ========== 第二部分:审计日志条目 ==========

@dataclass
class AuditLogEntry:
"""
审计日志条目 —— 记录安全事件的标准化格式。
每条日志包含事件描述、来源、结果和时间信息。
"""
timestamp: float # 事件发生时间
event_type: str # 事件类型(如 LOGIN、DATA_ACCESS)
user_id: str # 操作用户
action: str # 具体操作描述
resource: str # 访问的资源
result: str # 结果(SUCCESS/FAILURE)
ip_address: str # 来源 IP
details: Dict[str, Any] = field(default_factory=dict) # 附加详情(已脱敏)
log_id: str = "" # 日志唯一标识
previous_hash: str = "" # 前一条日志的哈希(链式完整性)

def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return asdict(self)

def to_json(self) -> str:
"""转换为 JSON 字符串"""
return json.dumps(self.to_dict(), ensure_ascii=False)


# ========== 第三部分:防篡改日志存储 ==========

class ImmutableAuditStore:
"""
不可变审计日志存储器 —— 使用哈希链防止日志篡改。
每条日志包含前一条日志的哈希值,形成区块链式结构。
任何对历史日志的修改都会破坏后续所有哈希链接。
"""

def __init__(self):
# 内存存储(生产环境应使用数据库或专用日志系统)
self._chain: List[AuditLogEntry] = []
self._secret_key = hashlib.sha256(b"audit_secret_key_change_in_prod").digest()

def append(self, entry: AuditLogEntry) -> None:
"""
追加新的审计日志条目。
自动计算前一条日志的哈希并链接到新条目中。
"""
# 设置时间戳
entry.timestamp = time.time()
# 生成日志 ID
entry.log_id = hashlib.sha256(
f"{entry.timestamp}{entry.user_id}{entry.action}".encode()
).hexdigest()[:16]

# 计算前一条日志的哈希链
if self._chain:
entry.previous_hash = self._compute_entry_hash(self._chain[-1])

# 添加到链中
self._chain.append(entry)

def verify_integrity(self) -> bool:
"""
验证整个日志链的完整性。
遍历所有日志条目,验证哈希链是否一致。
"""
if len(self._chain) <= 1:
return True

for i in range(1, len(self._chain)):
current = self._chain[i]
previous = self._chain[i - 1]

# 计算前一条日志的期望哈希
expected_hash = self._compute_entry_hash(previous)
if current.previous_hash != expected_hash:
print(f"完整性验证失败:日志 {i} 的哈希链断裂")
return False
print("日志链完整性验证通过")
return True

def _compute_entry_hash(self, entry: AuditLogEntry) -> str:
"""
计算日志条目的 HMAC 哈希值。
使用密钥防止未授权的哈希计算。
"""
# 序列化日志数据(排除 previous_hash 和 log_id)
data = entry.to_dict()
data.pop("previous_hash", None)
data.pop("log_id", None)
serialized = json.dumps(data, sort_keys=True, ensure_ascii=False)

# 使用 HMAC-SHA256 生成哈希
return hmac.new(
self._secret_key,
serialized.encode(),
hashlib.sha256
).hexdigest()

def query(self, event_type: Optional[str] = None,
user_id: Optional[str] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None) -> List[AuditLogEntry]:
"""
查询审计日志(支持按事件类型、用户、时间范围筛选)。
"""
results = self._chain.copy()

if event_type:
results = [e for e in results if e.event_type == event_type]
if user_id:
results = [e for e in results if e.user_id == user_id]
if start_time:
results = [e for e in results if e.timestamp >= start_time]
if end_time:
results = [e for e in results if e.timestamp <= end_time]

return results

def export_for_compliance(self, output_file: str) -> None:
"""
导出审计日志用于合规审查(导出前验证完整性)。
"""
if not self.verify_integrity():
raise ValueError("日志完整性验证失败,无法导出")

entries = [e.to_dict() for e in self._chain]
with open(output_file, "w", encoding="utf-8") as f:
json.dump(entries, f, ensure_ascii=False, indent=2)
print(f"合规日志已导出到 {output_file}")


# ========== 第四部分:审计日志记录器 ==========

class SecurityAuditLogger:
"""
安全审计日志记录器 —— 提供便捷的日志记录接口。
自动处理敏感数据脱敏、条目创建和存储。
"""

def __init__(self, store: ImmutableAuditStore):
self.store = store
self.masker = DataMasker()

def log_event(self, event_type: str, user_id: str, action: str,
resource: str, result: str, ip_address: str = "",
details: Optional[Dict] = None) -> None:
"""
记录安全审计事件。
自动对详情中的敏感数据进行脱敏处理。
"""
# 脱敏处理
safe_details = self.masker.mask_value("details", details or {})

# 创建日志条目
entry = AuditLogEntry(
timestamp=time.time(),
event_type=event_type,
user_id=user_id,
action=action,
resource=resource,
result=result,
ip_address=ip_address,
details=safe_details
)
self.store.append(entry)
print(f"审计日志已记录:{event_type} - {action} ({result})")

def log_login(self, user_id: str, success: bool, ip: str,
fail_reason: str = "") -> None:
"""
记录登录事件的便捷方法。
"""
self.log_event(
event_type="AUTHENTICATION",
user_id=user_id,
action="LOGIN",
resource="system",
result="SUCCESS" if success else "FAILURE",
ip_address=ip,
details={"fail_reason": fail_reason} if fail_reason else None
)


# ========== 第五部分:演示 ==========

def demo_audit_logging():
"""
演示安全审计日志的完整使用流程。
"""
print("=== 安全日志审计演示 ===\n")

store = ImmutableAuditStore()
logger = SecurityAuditLogger(store)

# 记录安全事件
print("--- 记录安全事件 ---")
logger.log_login("alice", True, "192.168.1.100")
logger.log_login("bob", False, "10.0.0.5", fail_reason="密码错误")
logger.log_event(
event_type="DATA_ACCESS",
user_id="alice",
action="READ",
resource="/api/users/profile",
result="SUCCESS",
ip_address="192.168.1.100",
details={
"query": "SELECT * FROM users",
"rows_affected": 1,
"password": "secret123" # 会被自动脱敏
}
)

# 查询日志
print("\n--- 查询登录事件 ---")
login_events = store.query(event_type="AUTHENTICATION")
for event in login_events:
print(f" {event.action} - {event.user_id} - {event.result}")

# 验证完整性
print("\n--- 完整性验证 ---")
store.verify_integrity()

# 导出合规报告
print("\n--- 导出合规日志 ---")
store.export_for_compliance("audit_export.json")

# 演示脱敏效果
print("\n--- 数据脱敏示例 ---")
masked = DataMasker.mask_text("用户邮箱 user@example.com,IP 192.168.1.1")
print(f" 脱敏前:用户邮箱 user@example.com,IP 192.168.1.1")
print(f" 脱敏后:{masked}")


if __name__ == "__main__":
demo_audit_logging()

http://www.jsqmd.com/news/921991/

相关文章:

  • 从零到一:基于eNSP构建企业级网络原型
  • 百度网盘限速太慢?3分钟教你用Python脚本实现满速下载
  • 从‘傻瓜式’到‘知其所以然’:一步步拆解Selenium处理shadow-root的底层逻辑与最佳实践
  • 【AI搜索引擎隐私保护终极指南】:2024年7大主流引擎加密机制、数据留存策略与用户控制力实测对比
  • 政府科技实战:AI赋能GovTech的挑战、策略与架构演进
  • STM32G473 IAP实战:用CAN总线给你的设备无线升级固件(附完整工程)
  • Python安全文件上传
  • 告别App切换!用HomeKit自动化让Siri指挥追觅X10进行指定房间清扫
  • Function Calling 的前世今生:为什么我们需要工具生态设计
  • 别再手动导.v文件了!Cadence AMS数模混合仿真,用这个-f文件配置法效率翻倍
  • 三步搞定网易云音乐无损下载:告别在线播放限制,建立个人音乐库
  • UE5 CesiumForUnreal避坑指南:从加载本地倾斜模型到解决Sequence卡顿的12个实战问题
  • 5分钟彻底解决Windows磁盘爆满:开源清理工具完全指南
  • Python安全序列化
  • Windows Cleaner终极指南:5分钟解决C盘爆红,让Windows系统重获新生!
  • 保姆级教程:用UE5 Niagara从零手搓一个会飘的烟雾特效(附材质节点图)
  • 用89S52单片机驱动TPμP-40A微型打印机:一个毕业生的硬件调试笔记与避坑指南
  • 保姆级教程:在Ubuntu 22.04上为服务器配置双网卡(内网+外网)并设置静态IP
  • TC3xx启动代码深度解析:从BROM到main(),你的程序是如何‘活’起来的?
  • ESP32-S3 + LVGL 8.3实战:如何为你的3.5寸SPI屏(ILI9488)定制UI并优化性能
  • 从编辑器到手机桌面:一次搞懂Unity Android打包的完整工作流与底层逻辑
  • ChatGPT Plus实战:AI如何重塑PPT制作、娱乐与学术研究
  • 5分钟极简方案:在Mac上解锁QQ音乐加密文件
  • UE5.3 GAS避坑指南:GameplayEffect的Tag堆叠与委托监听那些事儿
  • Windows Cleaner终极指南:5分钟解决C盘爆红,让电脑重获新生!
  • 用IMX6ULL和STM32MP157做个智能氛围灯:从传感器数据采集到TensorFlow Lite模型部署全流程(附源码)
  • 喜讯!奋飞咨询春明老师辅导客户斩获Ecovadis铜牌! - 奋飞咨询ecovadis
  • 多智能体AI系统在风险投资决策中的架构设计与工程实践
  • 别再手动画贴图了!用ShaderGraph+第二套UV,5分钟搞定模型动态描边效果
  • Python安全会话管理