当前位置: 首页 > news >正文

Python安全会话管理

"""
Python 安全会话管理 —— 服务端会话的安全实现
涵盖会话 ID 生成、安全 Cookie 设置、过期机制和会话固定攻击防护
"""

# 安装依赖:pip install flask cryptography
# 会话管理是 Web 应用安全的核心环节,涉及认证状态的整个生命周期

import secrets
import time
import hmac
import hashlib
import json
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from flask import Flask, session, request, Response, make_response

# ========== 第一部分:安全的会话 ID 生成 ==========

class SessionIdGenerator:
"""
会话 ID 生成器 —— 生产密码学安全的随机会话标识符。
不使用自增数字或时间戳,防止会话预测攻击。
"""

@staticmethod
def generate(length: int = 32) -> str:
"""
使用 Python secrets 模块生成不可预测的会话 ID。
secrets.token_hex 返回的是密码学安全的伪随机数。
"""
if length < 16:
raise ValueError("会话 ID 长度不能小于 16")
session_id = secrets.token_hex(length // 2)
return session_id

@staticmethod
def generate_with_entropy(length: int = 32) -> str:
"""
生成带熵前缀的会话 ID,便于调试时区分来源。
"""
entropy_prefix = secrets.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
return entropy_prefix + secrets.token_hex((length - 1) // 2)

@staticmethod
def validate_format(session_id: str) -> bool:
"""
验证会话 ID 格式是否符合预期。
"""
if not session_id or len(session_id) < 16:
return False
# 只允许十六进制字符和字母前缀
valid_chars = set("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdef0123456789")
return all(c in valid_chars for c in session_id)


# ========== 第二部分:服务端会话存储 ==========

@dataclass
class SessionData:
"""服务端会话数据结构"""
user_id: Optional[str] = None
username: Optional[str] = None
role: str = "guest"
created_at: float = 0.0
last_activity: float = 0.0
ip_address: Optional[str] = None
user_agent: Optional[str] = None
is_fresh: bool = True # 是否为新创建的会话(防会话固定攻击)


class SecureSessionStore:
"""
安全的会话存储 —— 使用服务端存储而非客户端 Cookie。
服务端存储可以防止 Cookie 篡改和会话数据泄露。
"""
# 生产环境中应使用 Redis 或数据库替代内存存储
_sessions: Dict[str, SessionData] = {}
_session_expiry: int = 1800 # 会话过期时间:30 分钟
_absolute_timeout: int = 28800 # 绝对超时:8 小时

def create_session(self, session_id: str, ip: str = "",
user_agent: str = "") -> SessionData:
"""
创建新会话,绑定客户端信息用于额外验证。
"""
now = time.time()
session_data = SessionData(
created_at=now,
last_activity=now,
ip_address=ip,
user_agent=user_agent
)
self._sessions[session_id] = session_data
return session_data

def get_session(self, session_id: str, ip: str = "",
user_agent: str = "") -> Optional[SessionData]:
"""
获取会话数据,同时执行多项安全检查:
1. 会话是否存在
2. 是否过期(闲置超时 + 绝对超时)
3. IP 地址是否变化(可选)
4. User-Agent 是否变化
"""
session_data = self._sessions.get(session_id)
if not session_data:
return None

now = time.time()
# 检查闲置超时(Idle Timeout)
if now - session_data.last_activity > self._session_expiry:
print(f"会话 {session_id[:16]}... 因闲置超时而过期")
self.destroy_session(session_id)
return None

# 检查绝对超时(Absolute Timeout)
if now - session_data.created_at > self._absolute_timeout:
print(f"会话 {session_id[:16]}... 因超过绝对有效期而过期")
self.destroy_session(session_id)
return None

# 检查 IP 地址变化(防止会话劫持)
if ip and session_data.ip_address and session_data.ip_address != ip:
print(f"警告:会话 IP 从 {session_data.ip_address} 变为 {ip}")
# 可选择销毁会话或仅记录日志
# self.destroy_session(session_id)
# return None

# 检查 User-Agent 变化
if user_agent and session_data.user_agent \
and session_data.user_agent != user_agent:
print("警告:会话 User-Agent 发生变化")
# 同样可以选择性处理

# 更新最后活动时间(滑动过期)
session_data.last_activity = now
return session_data

def destroy_session(self, session_id: str) -> None:
"""
销毁会话 —— 登出时必须调用。
"""
if session_id in self._sessions:
del self._sessions[session_id]
print(f"会话 {session_id[:16]}... 已销毁")

def regenerate_session_id(self, old_session_id: str) -> Optional[str]:
"""
重新生成会话 ID —— 登录成功后必须调用。
这是防止会话固定攻击(Session Fixation)的核心措施。
"""
session_data = self._sessions.get(old_session_id)
if not session_data:
return None

# 创建新会话 ID,迁移原会话数据
new_session_id = SessionIdGenerator.generate()
session_data.is_fresh = False
self._sessions[new_session_id] = session_data
# 删除旧会话
del self._sessions[old_session_id]
print(f"会话 ID 已重新生成:{old_session_id[:16]}... -> {new_session_id[:16]}...")
return new_session_id


# ========== 第三部分:安全的 Cookie 配置 ==========

def configure_secure_cookie(app: Flask) -> None:
"""
配置安全的会话 Cookie 属性。
这些设置让浏览器限制 Cookie 的访问范围,减少泄露风险。
"""
# 使用随机生成的密钥签名 Cookie
app.config["SECRET_KEY"] = secrets.token_hex(32)
# 会话 Cookie 名称使用非默认值,增加攻击难度
app.config["SESSION_COOKIE_NAME"] = "_secure_session"
# HttpOnly:禁止 JavaScript 访问 Cookie,防止 XSS 窃取
app.config["SESSION_COOKIE_HTTPONLY"] = True
# Secure:仅通过 HTTPS 传输 Cookie
app.config["SESSION_COOKIE_SECURE"] = True
# SameSite:限制跨站请求携带 Cookie,防止 CSRF
# Strict:完全禁止跨站;Lax:允许部分安全跨站(如 GET 链接)
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
# 设置 Cookie 的作用域路径
app.config["SESSION_COOKIE_PATH"] = "/"
# 会话过期时间
app.config["PERMANENT_SESSION_LIFETIME"] = 1800


# ========== 第四部分:Flask 集成示例 ==========

def create_secure_app() -> Flask:
"""
创建配置了安全会话管理的 Flask 应用。
"""
app = Flask(__name__)
configure_secure_cookie(app)
session_store = SecureSessionStore()

@app.route("/login", methods=["POST"])
def login():
"""
登录端点:验证凭证后创建安全会话。
"""
username = request.json.get("username")
# 验证用户名密码(省略具体验证逻辑)

# 生成新的会话 ID(重要!不要使用客户端提供的 ID)
session_id = SessionIdGenerator.generate()
session_store.create_session(
session_id,
ip=request.remote_addr,
user_agent=request.user_agent.string
)
# 设置安全的会话 Cookie
response = make_response({"status": "ok"})
response.set_cookie(
"_secure_session", session_id,
httponly=True,
secure=True,
samesite="Lax",
max_age=1800,
path="/"
)
return response

@app.route("/logout", methods=["POST"])
def logout():
"""
登出端点:销毁服务端和客户端的会话。
"""
session_id = request.cookies.get("_secure_session")
if session_id:
session_store.destroy_session(session_id)
response = make_response({"status": "logged_out"})
response.delete_cookie("_secure_session")
return response

return app


# ========== 第五部分:演示 ==========

def demo_session_management():
"""
演示安全会话管理的完整流程。
"""
print("=== 安全会话管理演示 ===\n")

store = SecureSessionStore()

# 1. 生成会话 ID
print("--- 会话 ID 生成 ---")
session_id = SessionIdGenerator.generate()
print(f"生成的会话 ID:{session_id[:20]}...")
print(f"格式验证:{SessionIdGenerator.validate_format(session_id)}\n")

# 2. 创建会话
print("--- 创建会话 ---")
store.create_session(session_id, ip="192.168.1.100")
data = store.get_session(session_id, ip="192.168.1.100")
print(f"会话状态:{'有效' if data else '无效'}\n")

# 3. 会话固定攻击防护(登录后重新生成 ID)
print("--- 登录后重新生成会话 ID ---")
new_session_id = store.regenerate_session_id(session_id)
print(f"旧会话 ID 仍然有效:{store.get_session(session_id) is not None}")
print(f"新会话 ID 有效:{store.get_session(new_session_id) is not None}\n")

# 4. 销毁会话
print("--- 登出销毁会话 ---")
store.destroy_session(new_session_id)
print(f"销毁后查询:{store.get_session(new_session_id) is None}")


if __name__ == "__main__":
demo_session_management()

http://www.jsqmd.com/news/921961/

相关文章:

  • AI Wrapper实战指南:从API调用到构建可持续AI产品的核心挑战
  • 2026年咸阳市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 避开这些坑!ArcGIS Pro二次开发AddIn项目图标和菜单不显示的修复指南
  • AI与区块链融合:Obizcoin如何重塑创业协作与信任机制
  • Power Automate审批流实战:从SharePoint触发到状态回写,我的踩坑与优化记录
  • 如何用3个步骤免费下载网易云音乐无损FLAC歌单
  • 别再硬算坐标了!Unity六边形地图的立体坐标与屏幕坐标转换,一篇讲透(附完整C#代码)
  • Figma组件库的变体(Variants)具体怎么使用?
  • 机器学习在游戏难度动态平衡中的应用与策略层设计
  • 从Modelsim波形反推设计问题:一个Quartus工程中的边沿检测模块调试实战
  • 2026年淮安市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 2026年上饶市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 盘点!8款热门CRM平台全维度评测,综合实力大比拼 - Joyky
  • 从Typora迁移到Obsidian,我踩过的那些坑和高效配置方案(含换行、图床、模板无缝迁移指南)
  • QGIS实战:用Graduated渲染让降雨量数据‘开口说话’(附C++ API完整代码)
  • 轻松搞定 Hermes 部署 Windows 一键安装实用技巧(含安装包)
  • 别再只会用预设了!用Unity粒子系统手搓一个带拖尾和二次爆炸的烟花(附完整项目文件)
  • Grafana告警飞书推送踩坑实录:从Webhook配置到消息模板优化,一篇搞定
  • 百考通AI:智能锚定研究根基,让学术起步精准高效
  • 手把手教你为Dell R730服务器安装VMware ESXi 8.0 U2(附Dell OEM版下载与RAID1配置避坑)
  • 从编译失败到成功运行:手把手解决ZLMediaKit交叉编译WebRTC时的三大经典错误
  • 科研党必备:用闲置的旧电脑/树莓派搭建WebDAV服务器,零成本搞定Zotero全平台文献同步
  • 2026年商丘市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 技术内容的SEO优化——让搜索引擎成为你的流量放大器
  • Win11上装Oracle 11g踩坑记:从环境报错到PL/SQL远程连接,保姆级排雷指南
  • 网易云音乐NCM格式转换终极指南:ncmdump工具完整使用教程
  • 百考通AI期刊智能化赋能学术发表,让优质成果高效落地
  • 从编辑器到游戏:揭秘Godot拖放API的“潜规则”与实战避坑指南
  • 2026年襄阳市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 别再到处找了!一份SMIC 0.18um工艺库文件详解,带你搞懂每个文件夹是干嘛的