当前位置: 首页 > news >正文

MonkeyCode实现OAuth2认证:从零到生产级SSO

为什么不用Session+Cookie了?

传统Session方案的痛点:

问题表现
扩展性差Session存在单台服务器内存,多实例无法共享
CSRF风险Cookie自动携带,容易被恶意网站利用
跨域麻烦Cookie在跨域场景下各种限制
移动端不友好App/小程序很难处理Cookie

JWT + OAuth2是无状态、跨平台、支持SSO的现代方案。

OAuth2四种授权模式

MonkeyCode帮你选最合适的:

模式适用场景典型用户
授权码模式有后端的Web应用(最安全)你的SaaS平台
隐式模式纯前端SPA(已淘汰,不推荐)
密码模式高度信任的第一方App你自己的官方App
客户端凭证模式服务间调用微服务A调用微服务B

99%的Web应用应该用:授权码模式 + PKCE(防止授权码被截获)。

实战:用Authlib实现OAuth2 + JWT

安装依赖

让MonkeyCode生成requirements.txt:

authlib==1.3.0 fastapi==0.104.0 pydantic==2.5.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4

完整实现(授权码模式 + JWT)

# app/auth.py - 完整认证模块 from fastapi import FastAPI, Depends, HTTPException, Request, status from authlib.integrations.starlette import OAuth from starlette.middleware.sessions import SessionMiddleware from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from typing import Optional, List import os app = FastAPI(title="OAuth2 JWT Demo") app.add_middleware(SessionMiddleware, secret_key=os.getenv("SESSION_SECRET", "dev-secret")) # ─── 密码哈希 ─── pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) # ─── JWT工具 ─── SECRET_KEY = os.getenv("JWT_SECRET", "dev-jwt-secret-change-in-prod") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire, "type": "access"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: raise HTTPException(status_code=401, detail="Token无效或已过期") # ─── OAuth2第三方登录(GitHub示例)─── oauth = OAuth() oauth.register( name="github", client_id=os.getenv("GITHUB_CLIENT_ID"), client_secret=os.getenv("GITHUB_CLIENT_SECRET"), access_token_url="https://github.com/login/oauth/access_token", access_token_params=None, authorize_url="https://github.com/login/oauth/authorize", authorize_params=None, api_base_url="https://api.github.com/", client_kwargs={"scope": "user:email"}, ) @app.get("/auth/github/login") async def github_login(request: Request): redirect_uri = request.url_for("github_callback") return await oauth.github.authorize_redirect(request, redirect_uri) @app.get("/auth/github/callback") async def github_callback(request: Request): token = await oauth.github.authorize_access_token(request) user_info = await oauth.github.get("user", token=token) user_email = user_info.json().get("email") # 查找或创建用户 user = await db.get_user_by_email(user_email) if not user: user = await db.create_user(email=user_email, name=user_info.json()["login"]) # 签发JWT access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) # 重定向回前端,把token放在URL fragment(不会发到后端) frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000") return RedirectResponse( url=f"{frontend_url}/auth/callback?access_token={access_token}&refresh_token={refresh_token}" ) # ─── 本地注册/登录 ─── from pydantic import BaseModel class RegisterRequest(BaseModel): email: str password: str name: str class LoginRequest(BaseModel): email: str password: str @app.post("/auth/register") async def register(req: RegisterRequest): existing = await db.get_user_by_email(req.email) if existing: raise HTTPException(400, "邮箱已注册") hashed = hash_password(req.password) user = await db.create_user(email=req.email, name=req.name, password_hash=hashed) access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } @app.post("/auth/login") async def login(req: LoginRequest): user = await db.get_user_by_email(req.email) if not user or not verify_password(req.password, user.password_hash): raise HTTPException(401, "邮箱或密码错误") access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } # ─── 刷新Token ─── class RefreshRequest(BaseModel): refresh_token: str @app.post("/auth/refresh") async def refresh(req: RefreshRequest): payload = decode_token(req.refresh_token) if payload.get("type") != "refresh": raise HTTPException(401, "无效的refresh token") new_access_token = create_access_token({"sub": payload["sub"]}) return {"access_token": new_access_token, "token_type": "bearer"} # ─── 获取当前用户(依赖注入)─── from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)) -> dict: payload = decode_token(cred.credentials) user_id = payload.get("sub") if not user_id: raise HTTPException(401, "无效的token") user = await db.get_user_by_id(int(user_id)) if not user: raise HTTPException(401, "用户不存在") return user # ─── 受保护的接口 ─── @app.get("/api/me") async def get_me(current_user: dict = Depends(get_current_user)): return { "id": current_user["id"], "email": current_user["email"], "name": current_user["name"] } @app.get("/api/protected") async def protected_route(current_user: dict = Depends(get_current_user)): return {"message": f"你好,{current_user['name']}!"}

前端如何携带JWT

// 登录后存储token const login = async (email, password) => { const resp = await fetch("http://localhost:8000/auth/login", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ email, password }) }); const data = await resp.json(); localStorage.setItem("access_token", data.access_token); localStorage.setItem("refresh_token", data.refresh_token); }; // 请求拦截器:自动附加token const apiFetch = async (url, options = {}) => { const token = localStorage.getItem("access_token"); const headers = { ...options.headers, "Authorization": `Bearer ${token}` }; let resp = await fetch(url, { ...options, headers }); // Token过期,尝试刷新 if (resp.status === 401) { const refreshed = await refreshToken(); if (refreshed) { const newToken = localStorage.getItem("access_token"); headers["Authorization"] = `Bearer ${newToken}`; resp = await fetch(url, { ...options, headers }); } } return resp; }; const refreshToken = async () => { const refreshToken = localStorage.getItem("refresh_token"); const resp = await fetch("http://localhost:8000/auth/refresh", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ refresh_token: refreshToken }) }); if (resp.ok) { const data = await resp.json(); localStorage.setItem("access_token", data.access_token); return true; } // Refresh token也过期,跳转登录 localStorage.clear(); window.location.href = "/login"; return false; };

PKCE:为什么授权码模式需要它?

传统授权码模式有个漏洞:授权码在回调URL中,可能被恶意App截获(Android/iOS的Intent劫持)。

PKCE(Proof Key for Code Exchange)解决这个问题:

前端生成 code_verifier(随机字符串) ↓ hash → code_challenge ↓ 把 code_challenge 发给授权服务器 授权服务器返回授权码 ↓ 前端带着 授权码 + code_verifier 换token 服务器验证 hash(code_verifier) == code_challenge? ✅ 通过才发token

MonkeyCode生成的PKCE实现:

import hashlib import base64 import secrets def generate_pkce_pair(): """生成code_verifier和code_challenge""" code_verifier = base64.urlsafe_b64encode( secrets.token_bytes(32) ).decode().rstrip("=") code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).decode().rstrip("=") return code_verifier, code_challenge # 前端登录时 verifier, challenge = generate_pkce_pair() session["pkce_verifier"] = verifier # 存在session redirect_url = f"https://auth-server/oauth/authorize?...&code_challenge={challenge}&code_challenge_method=S256" # 回调时 stored_verifier = session.pop("pkce_verifier") # 用 verifier 换token,服务器会验证

安全最佳实践(MonkeyCode自动检查)

让MonkeyCode检查我的认证代码的安全性,列出所有风险点
检查项风险修复
JWT Secret硬编码Token可被伪造用环境变量,长度≥32字符
没用HTTPSToken被中间人截获生产环境强制HTTPS
access token过期时间太长泄露后影响大≤30分钟,用refresh token续期
没做登出黑名单token被盗用无法撤销登出时把token加入黑名单(Redis)
没限制登录尝试容易被暴力破解登录失败5次锁定15分钟
密码强度不够弱密码被撞库注册时强制8位+大小写+数字
http://www.jsqmd.com/news/1092200/

相关文章:

  • 打破游戏控制器兼容性壁垒:GlosSI系统级Steam Input解决方案
  • 3步解锁QQ音乐:qmcdump解密工具完全指南
  • Lean 4实战:当形式化验证遇见现代编程范式
  • 如何5分钟实现智能PSD分层:Layerdivider图像分层神器终极指南
  • 费可商用 PHP 管理后台 CatchAdmin V5.3.1 发布 后台打包直降 5s 内
  • 级别的AutoBuilder,一键干掉80%的重复CRUD工作
  • Claude 编程经验
  • 品牌出海做GEO,多语言能力怎么挑?2026 年支持多语言AI搜索优化的服务商盘点
  • AI Agent时代如何打造高质量软件?
  • 高校汉服租赁网站源码 Java+SpringBoot+Vue 万字文档
  • 那些年我们写过的“面条代码”
  • FDE标准:FDE落地最后一公里,在银行、政务,石油,电力,金融的产品、标准和落地案例
  • IEC 60205-2026
  • ChatGPT Plus值不值得续费:基于37项功能对比、127小时实测数据与API调用成本精算
  • MybatisPlus 分页插件与@InterceptorIgnore注解冲突:从源码解析到精准修复
  • AFE5808评估板实战指南:从硬件配置到动态性能测试
  • Burp Suite自定义插件开发实战:实现HTTP流量自动加解密
  • iPhone 数据迁移至 POCO 手机:5 种流畅传输方案
  • VOSviewer实战指南:从数据导入到知识图谱解读
  • Appium自动化测试:从核心原理到跨平台实战全解析
  • 国内口碑好的手机平板回收品牌有哪些
  • GM-Alt₂富勒烯室温超导体系学术评价
  • 竣宝潜龙尾盘副选精准抓主力洗盘尾巴主升浪信号 九点智投三步点金,五星智投双紫擒龙指标选股魔方量化指标公式
  • Airtest+Selenium自动化测试实战:从零搭建混合模式脚本
  • HTML5+CSS3+JS小实例:图片懒加载
  • 蛋仔网:做任务状态说明怎么设计,低压看板更稳
  • Python实现开源组件CVE漏洞自动化检测与修复指南
  • 技术方案:抖音批量下载助手 - 自动化视频采集高效方案
  • 光说不练假把式,我们直接上代码。
  • 14-命令行Flags详解