当前位置: 首页 > news >正文

Python安全序列化

"""
Python 安全序列化 —— 规避 Pickle 风险,使用安全替代方案
涵盖 pickle 漏洞演示、JSON/msgpack 安全序列化、itsdangerous 签名加密
"""

# 安装依赖:pip install msgpack itsdangerous
# 序列化安全是 Python 安全中常被忽视但后果严重的领域

import pickle
import json
import msgpack
import hashlib
import hmac
import os
from typing import Any, Dict, Optional
from itsdangerous import URLSafeSerializer, TimedSerializer, BadSignature

# ========== 第一部分:Pickle 安全风险 ==========

class UnsafePickleDemo:
"""
Pickle 反序列化漏洞演示。
pickle.loads() 可以执行任意 Python 代码,这就是为什么
永远不要对不可信数据使用 pickle 反序列化。
"""

@staticmethod
def create_malicious_payload() -> bytes:
"""
创建一个恶意的 pickle 载荷。
实际攻击中,攻击者会让服务器反序列化此载荷以执行恶意代码。
"""
# 使用 __reduce__ 定义反序列化时执行的代码
class EvilPickle:
def __reduce__(self):
# 反序列化时将执行 os.system 命令
import os
cmd = "echo '攻击成功!反序列化执行了任意命令'"
return (os.system, (cmd,))

evil = EvilPickle()
malicious_data = pickle.dumps(evil)
print(f"恶意 pickle 载荷已生成({len(malicious_data)} 字节)")
return malicious_data

@staticmethod
def dangerous_deserialize(data: bytes) -> Any:
"""
不安全的反序列化 —— 直接调用 pickle.loads。
这就是生产代码中绝对不能做的事情!
"""
print("警告:正在反序列化不可信的 pickle 数据!")
obj = pickle.loads(data) # 这里会执行任意代码
return obj


# ========== 第二部分:安全的序列化替代方案 ==========

class SafeSerialization:
"""
安全的序列化方案 —— 使用不执行代码的格式。
JSON 和 MessagePack 都是纯数据格式,不会执行代码。
"""

@staticmethod
def serialize_json(data: Any) -> str:
"""
使用 JSON 进行安全序列化。
JSON 是最通用的跨语言序列化格式。
"""
return json.dumps(data, ensure_ascii=False, default=str)

@staticmethod
def deserialize_json(data: str) -> Any:
"""
安全反序列化 JSON 数据。
"""
return json.loads(data)

@staticmethod
def serialize_msgpack(data: Any) -> bytes:
"""
使用 MessagePack 进行安全序列化。
MessagePack 类似 JSON 但更紧凑,适合高性能场景。
"""
return msgpack.packb(data)

@staticmethod
def deserialize_msgpack(data: bytes) -> Any:
"""
安全反序列化 MessagePack 数据。
"""
return msgpack.unpackb(data)

@staticmethod
def compare_formats(data: Dict) -> None:
"""
比较不同序列化格式的效率和大小。
"""
json_data = SafeSerialization.serialize_json(data)
msgpack_data = SafeSerialization.serialize_msgpack(data)

print(f"原始数据大小:{len(str(data))} 字节")
print(f"JSON 序列化大小:{len(json_data)} 字节")
print(f"MessagePack 序列化大小:{len(msgpack_data)} 字节")
print(f"MessagePack 节省了 {len(json_data) - len(msgpack_data)} 字节")


# ========== 第三部分:带签名和加密的安全序列化 ==========

class SecureSerializer:
"""
安全序列化器 —— 使用 itsdangerous 库对数据进行签名和加密。
itsdangerous 提供:
- 签名:防止数据被篡改
- 时间戳:防止重放攻击
- 加密:保护数据机密性
"""

def __init__(self, secret_key: str):
"""
初始化安全序列化器。
secret_key 必须是高熵的密钥(建议使用 secrets.token_hex() 生成)。
"""
self.secret_key = secret_key
# URL 安全序列化器:对数据进行签名
self.url_serializer = URLSafeSerializer(secret_key)
# 时间戳序列化器:带过期时间的签名
self.timed_serializer = TimedSerializer(secret_key)

def sign_data(self, data: Any) -> str:
"""
对数据进行签名,防止篡改。
签名后的数据以 base64 编码的字符串形式返回。
"""
# URLSafeSerializer 自动处理 JSON 序列化和签名
signed = self.url_serializer.dumps(data)
print(f"数据已签名(长度:{len(signed)} 字符)")
return signed

def verify_signed_data(self, signed_data: str) -> Optional[Any]:
"""
验证签名并还原数据。
如果签名无效(数据被篡改),抛出 BadSignature 异常。
"""
try:
data = self.url_serializer.loads(signed_data)
print("签名验证通过,数据完整未被篡改")
return data
except BadSignature:
print("警告:数据签名无效,数据可能被篡改!")
return None

def sign_with_expiry(self, data: Any,
max_age_seconds: int = 3600) -> str:
"""
带过期时间的签名。
适用于需要限时使用的令牌(如密码重置链接)。
"""
signed = self.timed_serializer.dumps(data)
print(f"数据已签名(有效期 {max_age_seconds} 秒)")
return signed

def verify_with_expiry(self, signed_data: str,
max_age_seconds: int = 3600) -> Optional[Any]:
"""
验证签名并检查是否过期。
"""
try:
data = self.timed_serializer.loads(
signed_data, max_age=max_age_seconds
)
print("签名验证通过,且在有效期内")
return data
except BadSignature:
print("警告:签名无效或已过期!")
return None


class SignEncryptSerializer:
"""
签名 + 加密的序列化方案 —— 自己实现 HMAC 签名。
确保数据的完整性和机密性。
"""

def __init__(self, signing_key: str, encryption_key: Optional[str] = None):
self.signing_key = signing_key.encode()
self.encryption_key = encryption_key

def serialize_secure(self, data: Any) -> bytes:
"""
安全序列化:先编码 -> 再加密(可选)-> 最后签名。
先签名后加密可以防止签名碰撞攻击。
"""
# 第一步:序列化为 JSON
payload = json.dumps(data, ensure_ascii=False).encode()

# 第二步:计算 HMAC 签名
signature = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()

# 第三步:组合签名和数据
result = json.dumps({
"signature": signature,
"payload": payload.decode()
}).encode()
return result

def deserialize_secure(self, data: bytes) -> Optional[Any]:
"""
安全反序列化:先验证签名 -> 再解密(可选)-> 最后解码。
"""
try:
# 第一步:解析外层 JSON
wrapper = json.loads(data.decode())

# 第二步:提取签名和载荷
expected_sig = wrapper["signature"]
payload_str = wrapper["payload"]
payload = payload_str.encode()

# 第三步:验证签名
actual_sig = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()

# 使用 hmac.compare_digest 防止时序攻击
if not hmac.compare_digest(expected_sig, actual_sig):
print("警告:HMAC 签名不匹配,数据被篡改!")
return None

# 第四步:解码 JSON
return json.loads(payload)

except (json.JSONDecodeError, KeyError, Exception) as e:
print(f"反序列化失败:{e}")
return None


# ========== 第四部分:Notrust 反序列化模式 ==========

class NotrustDeserializer:
"""
"不信任"反序列化模式 —— 处理不可信数据时的安全最佳实践。
即使使用安全的格式,也要验证数据的结构和内容。
"""

@staticmethod
def safe_deserialize(data: str,
expected_schema: Optional[Dict] = None) -> Optional[Dict]:
"""
安全的反序列化:验证类型、结构和内容后再使用。
"""
try:
# 第一步:使用安全格式解析
obj = json.loads(data)

# 第二步:验证类型
if not isinstance(obj, dict):
print("错误:期望 JSON 对象,得到其他类型")
return None

# 第三步:验证必填字段
if expected_schema:
for field, field_type in expected_schema.items():
if field not in obj:
print(f"错误:缺少必填字段 '{field}'")
return None
if not isinstance(obj[field], field_type):
print(f"错误:字段 '{field}' 类型错误")
return None

# 第四步:验证值范围
for key, value in obj.items():
if isinstance(value, str) and len(value) > 1000:
print(f"错误:字段 '{key}' 超过最大长度")
return None
if isinstance(value, (int, float)) and abs(value) > 1e9:
print(f"错误:字段 '{key}' 数值超出范围")
return None

print("反序列化验证通过")
return obj

except json.JSONDecodeError:
print("错误:无效的 JSON 格式")
return None


# ========== 第五部分:演示 ==========

def demo_secure_serialization():
"""
演示各种序列化的安全特性和风险。
"""
print("=== 安全序列化演示 ===\n")

# 1. Pickle 风险演示
print("--- Pickle 反序列化风险 ---")
malicious = UnsafePickleDemo.create_malicious_payload()
print("注意:在生产代码中绝不要对不可信数据调用 pickle.loads()")

# 2. 安全格式比较
print("\n--- 序列化格式比较 ---")
test_data = {
"user": "alice",
"scores": [95, 87, 92],
"metadata": {"version": "1.0", "timestamp": 1234567890}
}
SafeSerialization.compare_formats(test_data)

# 3. itsdangerous 签名
print("\n--- 带签名的安全序列化 ---")
serializer = SecureSerializer(
secret_key="my-secret-key-change-in-production"
)
user_data = {"user_id": 123, "role": "admin"}
signed = serializer.sign_data(user_data)
verified = serializer.verify_signed_data(signed)
print(f"还原数据:{verified}")

# 4. 篡改检测演示
print("\n--- 篡改检测演示 ---")
tampered = signed[:-1] + ("X" if signed[-1] != "X" else signed[-2])
result = serializer.verify_signed_data(tampered)
print(f"篡改数据还原结果:{result}")

# 5. Schema 验证
print("\n--- Notrust 反序列化验证 ---")
safe_data = '{"name": "Alice", "age": 30}'
schema = {"name": str, "age": int}
result = NotrustDeserializer.safe_deserialize(safe_data, schema)
print(f"验证通过的数据:{result}")

# 6. 序列化安全建议
print("\n--- 序列化安全最佳实践 ---")
print("""
1. 永远不要对不可信数据使用 pickle.loads()
2. 优先使用 JSON 序列化(纯数据,不执行代码)
3. 对序列化数据进行 HMAC 签名以防止篡改
4. itsdangerous 提供了简洁的签名+序列化方案
5. 反序列化后验证数据结构和值范围
6. 敏感数据在序列化前应加密
""")


if __name__ == "__main__":
demo_secure_serialization()

http://www.jsqmd.com/news/921977/

相关文章:

  • Windows Cleaner终极指南:5分钟解决C盘爆红,让Windows系统重获新生!
  • 保姆级教程:用UE5 Niagara从零手搓一个会飘的烟雾特效(附材质节点图)
  • 用89S52单片机驱动TPμP-40A微型打印机:一个毕业生的硬件调试笔记与避坑指南
  • 保姆级教程:在Ubuntu 22.04上为服务器配置双网卡(内网+外网)并设置静态IP
  • TC3xx启动代码深度解析:从BROM到main(),你的程序是如何‘活’起来的?
  • ESP32-S3 + LVGL 8.3实战:如何为你的3.5寸SPI屏(ILI9488)定制UI并优化性能
  • 从编辑器到手机桌面:一次搞懂Unity Android打包的完整工作流与底层逻辑
  • ChatGPT Plus实战:AI如何重塑PPT制作、娱乐与学术研究
  • 5分钟极简方案:在Mac上解锁QQ音乐加密文件
  • UE5.3 GAS避坑指南:GameplayEffect的Tag堆叠与委托监听那些事儿
  • Windows Cleaner终极指南:5分钟解决C盘爆红,让电脑重获新生!
  • 用IMX6ULL和STM32MP157做个智能氛围灯:从传感器数据采集到TensorFlow Lite模型部署全流程(附源码)
  • 喜讯!奋飞咨询春明老师辅导客户斩获Ecovadis铜牌! - 奋飞咨询ecovadis
  • 多智能体AI系统在风险投资决策中的架构设计与工程实践
  • 别再手动画贴图了!用ShaderGraph+第二套UV,5分钟搞定模型动态描边效果
  • Python安全会话管理
  • AI Wrapper实战指南:从API调用到构建可持续AI产品的核心挑战
  • 2026年咸阳市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 避开这些坑!ArcGIS Pro二次开发AddIn项目图标和菜单不显示的修复指南
  • AI与区块链融合:Obizcoin如何重塑创业协作与信任机制
  • Power Automate审批流实战:从SharePoint触发到状态回写,我的踩坑与优化记录
  • 如何用3个步骤免费下载网易云音乐无损FLAC歌单
  • 别再硬算坐标了!Unity六边形地图的立体坐标与屏幕坐标转换,一篇讲透(附完整C#代码)
  • Figma组件库的变体(Variants)具体怎么使用?
  • 机器学习在游戏难度动态平衡中的应用与策略层设计
  • 从Modelsim波形反推设计问题:一个Quartus工程中的边沿检测模块调试实战
  • 2026年淮安市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 2026年上饶市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 盘点!8款热门CRM平台全维度评测,综合实力大比拼 - Joyky
  • 从Typora迁移到Obsidian,我踩过的那些坑和高效配置方案(含换行、图床、模板无缝迁移指南)