Python安全序列化
"""
Python 安全序列化 —— 规避 Pickle 风险,使用安全替代方案
涵盖 pickle 漏洞演示、JSON/msgpack 安全序列化、itsdangerous 签名加密
"""
# 安装依赖:pip install msgpack itsdangerous
# 序列化安全是 Python 安全中常被忽视但后果严重的领域
import pickle
import json
import msgpack
import hashlib
import hmac
import os
from typing import Any, Dict, Optional
from itsdangerous import URLSafeSerializer, TimedSerializer, BadSignature
# ========== 第一部分:Pickle 安全风险 ==========
class UnsafePickleDemo:
"""
Pickle 反序列化漏洞演示。
pickle.loads() 可以执行任意 Python 代码,这就是为什么
永远不要对不可信数据使用 pickle 反序列化。
"""
@staticmethod
def create_malicious_payload() -> bytes:
"""
创建一个恶意的 pickle 载荷。
实际攻击中,攻击者会让服务器反序列化此载荷以执行恶意代码。
"""
# 使用 __reduce__ 定义反序列化时执行的代码
class EvilPickle:
def __reduce__(self):
# 反序列化时将执行 os.system 命令
import os
cmd = "echo '攻击成功!反序列化执行了任意命令'"
return (os.system, (cmd,))
evil = EvilPickle()
malicious_data = pickle.dumps(evil)
print(f"恶意 pickle 载荷已生成({len(malicious_data)} 字节)")
return malicious_data
@staticmethod
def dangerous_deserialize(data: bytes) -> Any:
"""
不安全的反序列化 —— 直接调用 pickle.loads。
这就是生产代码中绝对不能做的事情!
"""
print("警告:正在反序列化不可信的 pickle 数据!")
obj = pickle.loads(data) # 这里会执行任意代码
return obj
# ========== 第二部分:安全的序列化替代方案 ==========
class SafeSerialization:
"""
安全的序列化方案 —— 使用不执行代码的格式。
JSON 和 MessagePack 都是纯数据格式,不会执行代码。
"""
@staticmethod
def serialize_json(data: Any) -> str:
"""
使用 JSON 进行安全序列化。
JSON 是最通用的跨语言序列化格式。
"""
return json.dumps(data, ensure_ascii=False, default=str)
@staticmethod
def deserialize_json(data: str) -> Any:
"""
安全反序列化 JSON 数据。
"""
return json.loads(data)
@staticmethod
def serialize_msgpack(data: Any) -> bytes:
"""
使用 MessagePack 进行安全序列化。
MessagePack 类似 JSON 但更紧凑,适合高性能场景。
"""
return msgpack.packb(data)
@staticmethod
def deserialize_msgpack(data: bytes) -> Any:
"""
安全反序列化 MessagePack 数据。
"""
return msgpack.unpackb(data)
@staticmethod
def compare_formats(data: Dict) -> None:
"""
比较不同序列化格式的效率和大小。
"""
json_data = SafeSerialization.serialize_json(data)
msgpack_data = SafeSerialization.serialize_msgpack(data)
print(f"原始数据大小:{len(str(data))} 字节")
print(f"JSON 序列化大小:{len(json_data)} 字节")
print(f"MessagePack 序列化大小:{len(msgpack_data)} 字节")
print(f"MessagePack 节省了 {len(json_data) - len(msgpack_data)} 字节")
# ========== 第三部分:带签名和加密的安全序列化 ==========
class SecureSerializer:
"""
安全序列化器 —— 使用 itsdangerous 库对数据进行签名和加密。
itsdangerous 提供:
- 签名:防止数据被篡改
- 时间戳:防止重放攻击
- 加密:保护数据机密性
"""
def __init__(self, secret_key: str):
"""
初始化安全序列化器。
secret_key 必须是高熵的密钥(建议使用 secrets.token_hex() 生成)。
"""
self.secret_key = secret_key
# URL 安全序列化器:对数据进行签名
self.url_serializer = URLSafeSerializer(secret_key)
# 时间戳序列化器:带过期时间的签名
self.timed_serializer = TimedSerializer(secret_key)
def sign_data(self, data: Any) -> str:
"""
对数据进行签名,防止篡改。
签名后的数据以 base64 编码的字符串形式返回。
"""
# URLSafeSerializer 自动处理 JSON 序列化和签名
signed = self.url_serializer.dumps(data)
print(f"数据已签名(长度:{len(signed)} 字符)")
return signed
def verify_signed_data(self, signed_data: str) -> Optional[Any]:
"""
验证签名并还原数据。
如果签名无效(数据被篡改),抛出 BadSignature 异常。
"""
try:
data = self.url_serializer.loads(signed_data)
print("签名验证通过,数据完整未被篡改")
return data
except BadSignature:
print("警告:数据签名无效,数据可能被篡改!")
return None
def sign_with_expiry(self, data: Any,
max_age_seconds: int = 3600) -> str:
"""
带过期时间的签名。
适用于需要限时使用的令牌(如密码重置链接)。
"""
signed = self.timed_serializer.dumps(data)
print(f"数据已签名(有效期 {max_age_seconds} 秒)")
return signed
def verify_with_expiry(self, signed_data: str,
max_age_seconds: int = 3600) -> Optional[Any]:
"""
验证签名并检查是否过期。
"""
try:
data = self.timed_serializer.loads(
signed_data, max_age=max_age_seconds
)
print("签名验证通过,且在有效期内")
return data
except BadSignature:
print("警告:签名无效或已过期!")
return None
class SignEncryptSerializer:
"""
签名 + 加密的序列化方案 —— 自己实现 HMAC 签名。
确保数据的完整性和机密性。
"""
def __init__(self, signing_key: str, encryption_key: Optional[str] = None):
self.signing_key = signing_key.encode()
self.encryption_key = encryption_key
def serialize_secure(self, data: Any) -> bytes:
"""
安全序列化:先编码 -> 再加密(可选)-> 最后签名。
先签名后加密可以防止签名碰撞攻击。
"""
# 第一步:序列化为 JSON
payload = json.dumps(data, ensure_ascii=False).encode()
# 第二步:计算 HMAC 签名
signature = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()
# 第三步:组合签名和数据
result = json.dumps({
"signature": signature,
"payload": payload.decode()
}).encode()
return result
def deserialize_secure(self, data: bytes) -> Optional[Any]:
"""
安全反序列化:先验证签名 -> 再解密(可选)-> 最后解码。
"""
try:
# 第一步:解析外层 JSON
wrapper = json.loads(data.decode())
# 第二步:提取签名和载荷
expected_sig = wrapper["signature"]
payload_str = wrapper["payload"]
payload = payload_str.encode()
# 第三步:验证签名
actual_sig = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()
# 使用 hmac.compare_digest 防止时序攻击
if not hmac.compare_digest(expected_sig, actual_sig):
print("警告:HMAC 签名不匹配,数据被篡改!")
return None
# 第四步:解码 JSON
return json.loads(payload)
except (json.JSONDecodeError, KeyError, Exception) as e:
print(f"反序列化失败:{e}")
return None
# ========== 第四部分:Notrust 反序列化模式 ==========
class NotrustDeserializer:
"""
"不信任"反序列化模式 —— 处理不可信数据时的安全最佳实践。
即使使用安全的格式,也要验证数据的结构和内容。
"""
@staticmethod
def safe_deserialize(data: str,
expected_schema: Optional[Dict] = None) -> Optional[Dict]:
"""
安全的反序列化:验证类型、结构和内容后再使用。
"""
try:
# 第一步:使用安全格式解析
obj = json.loads(data)
# 第二步:验证类型
if not isinstance(obj, dict):
print("错误:期望 JSON 对象,得到其他类型")
return None
# 第三步:验证必填字段
if expected_schema:
for field, field_type in expected_schema.items():
if field not in obj:
print(f"错误:缺少必填字段 '{field}'")
return None
if not isinstance(obj[field], field_type):
print(f"错误:字段 '{field}' 类型错误")
return None
# 第四步:验证值范围
for key, value in obj.items():
if isinstance(value, str) and len(value) > 1000:
print(f"错误:字段 '{key}' 超过最大长度")
return None
if isinstance(value, (int, float)) and abs(value) > 1e9:
print(f"错误:字段 '{key}' 数值超出范围")
return None
print("反序列化验证通过")
return obj
except json.JSONDecodeError:
print("错误:无效的 JSON 格式")
return None
# ========== 第五部分:演示 ==========
def demo_secure_serialization():
"""
演示各种序列化的安全特性和风险。
"""
print("=== 安全序列化演示 ===\n")
# 1. Pickle 风险演示
print("--- Pickle 反序列化风险 ---")
malicious = UnsafePickleDemo.create_malicious_payload()
print("注意:在生产代码中绝不要对不可信数据调用 pickle.loads()")
# 2. 安全格式比较
print("\n--- 序列化格式比较 ---")
test_data = {
"user": "alice",
"scores": [95, 87, 92],
"metadata": {"version": "1.0", "timestamp": 1234567890}
}
SafeSerialization.compare_formats(test_data)
# 3. itsdangerous 签名
print("\n--- 带签名的安全序列化 ---")
serializer = SecureSerializer(
secret_key="my-secret-key-change-in-production"
)
user_data = {"user_id": 123, "role": "admin"}
signed = serializer.sign_data(user_data)
verified = serializer.verify_signed_data(signed)
print(f"还原数据:{verified}")
# 4. 篡改检测演示
print("\n--- 篡改检测演示 ---")
tampered = signed[:-1] + ("X" if signed[-1] != "X" else signed[-2])
result = serializer.verify_signed_data(tampered)
print(f"篡改数据还原结果:{result}")
# 5. Schema 验证
print("\n--- Notrust 反序列化验证 ---")
safe_data = '{"name": "Alice", "age": 30}'
schema = {"name": str, "age": int}
result = NotrustDeserializer.safe_deserialize(safe_data, schema)
print(f"验证通过的数据:{result}")
# 6. 序列化安全建议
print("\n--- 序列化安全最佳实践 ---")
print("""
1. 永远不要对不可信数据使用 pickle.loads()
2. 优先使用 JSON 序列化(纯数据,不执行代码)
3. 对序列化数据进行 HMAC 签名以防止篡改
4. itsdangerous 提供了简洁的签名+序列化方案
5. 反序列化后验证数据结构和值范围
6. 敏感数据在序列化前应加密
""")
if __name__ == "__main__":
demo_secure_serialization()
