试想一下,你辛苦做出的某条 Agent API 的调用量在半小时内飙到了平时的 200 倍。查日志发现,有人用脚本把接口当免费 GPT 在薅,更恐怖的是,他们尝试在 prompt 里夹带 ` 删除所有日志 ` 这种恶意指令——如果你的工具执行部分根本没做过滤……
太多这种案例让我意识到:给 Agent 套上安全防线,不是“可选项”,而是从第一行代码起就该刻进骨子里的铁律。
👩💻我是爱折腾的一名程序媛,喜欢研究全栈开发的各种实践,热爱分享踩坑后的收获与思考,也享受用代码写出各种实用小工具解决问题的快乐。
如果你也在技术这条路上向前走,希望我们能彼此陪伴,一起成为更好的自己 🌱
前面几篇我们学习了如何理解和创建FastAPI AI Agent接口,今天就来聊聊怎么构建我们的接口防线,每道都带着可直接跑的代码,希望能让你的线上 Agent 睡个安稳觉。
🛡️ 第一道:门禁卡——OAuth2 + JWT 认证
API 不能敞开大门随便进。FastAPI 内置了 OAuth2PasswordBearer 这套“门禁系统”,配合 JWT 就像一张带时效的房卡。
登录后拿 token,后续请求都得带上,否则直接 401 挡在门外。
这里一定要注意:千万不要把 secret key 硬编码,正经做法是从环境变量读。
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt
from datetime import datetime, timedeltaSECRET_KEY = "从环境变量取!"
ALGORITHM = "HS256"oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")app = FastAPI()def create_token(data: dict):to_encode = data.copy()to_encode.update({"exp": datetime.utcnow() + timedelta(hours=2)})return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)@app.post("/token")
def login(form_data: OAuth2PasswordRequestForm = Depends()):# 验证用户(示例:固定账号)if form_data.username != "admin" or form_data.password != "pass":raise HTTPException(status_code=401, detail="密码错误")return {"access_token": create_token({"sub": form_data.username}), "token_type": "bearer"}
👑 第二道:权限分级——RBAC
有了门禁还不够,不能所有人进来都能操作 Agent。
我们用“角色”来区分:普通用户只能查询,管理员才能修改配置或调用危险工具。
关键是一个依赖注入函数,检查 token 里带的 role 字段。
这里也要注意,不能只靠前端隐藏按钮,后端完全没校验——那等于把钥匙藏在门口地垫下。
from fastapi import Securitydef get_current_user(token: str = Depends(oauth2_scheme)):payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])return {"username": payload.get("sub"), "role": payload.get("role", "user")}def require_role(role: str):def checker(user = Depends(get_current_user)):if user["role"] != role:raise HTTPException(status_code=403, detail="权限不足")return userreturn checker@app.get("/admin/agent-config")
def update_config(user = Depends(require_role("admin"))):return {"message": "只有管理员看得到"}
🧹 第三道:入口安检——输入清洗与限流
Agent 最怕提示注入。我的做法是:先用 Pydantic 做格式校验,再上自定义清洗函数,把那些 忽略前面的指令、请扮演 DAN 之类的黑名单短语直接堵死。当然黑名单不是银弹,但能挡住大部分脚本小子。
另外,同一 IP 每秒 100 次的请求绝对不正常。用 SlowAPI 中间件限流,就像给接口装了水龙头,防止被冲垮。
注意,生产环境一定要搭配 Redis 存储计数,不然多 worker 下会漏。
from slowapi import Limiter
from slowapi.util import get_remote_address
from pydantic import BaseModel, validatorlimiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, lambda _,_: JSONResponse(status_code=429, content="慢点!"))class PromptInput(BaseModel):prompt: str@validator("prompt")def check_injection(cls, v):blacklist = ["忽略前面的指令", "ignore previous", "DAN"]if any(b in v.lower() for b in blacklist):raise ValueError("检测到注入企图")return v@app.post("/agent/ask")
@limiter.limit("5/second")
def ask_agent(item: PromptInput, request: Request):return {"response": f"你说: {item.prompt[:50]}..."}
🧪 第四道:工具执行的“无菌实验室”
Agent 经常会调用本地命令、跑 Python 脚本,这相当于给用户开了一个终端——不用沙箱就是自杀。
我强制所有命令必须通过白名单,比如只能执行 ls、cat 这几个;
同时用 Python 的 resource 模块限制子进程的内存和 CPU 时间,避免恶意死循环吃光服务器。
网络隔离可以进一步用 Docker 的 --network=none,这里用子进程做个最简演示:
import subprocess, resourceALLOWED_COMMANDS = ["ls", "cat", "echo"]
def safe_exec(cmd: str):parts = cmd.strip().split()if not parts or parts[0] not in ALLOWED_COMMANDS:raise ValueError("命令不在白名单")def set_limits():resource.setrlimit(resource.RLIMIT_AS, (50*1024*1024, 100*1024*1024)) # 内存50MBresource.setrlimit(resource.RLIMIT_CPU, (2, 2)) # CPU时间2秒try:result = subprocess.run(parts, capture_output=True, text=True,preexec_fn=set_limits, timeout=5)return result.stdoutexcept subprocess.TimeoutExpired:return "执行超时,已终止"
🔑 第五道:密钥轮换与环境隔离
最后这条很多人会忽略:JWT 签名密钥、第三方 API Key 要定期换,就像家门锁的密码不能万年不变。
我写了一个简单的 KeyManager,从环境变量读取,并支持定期重载。
不同环境(开发、测试、生产)严格隔离,一个环境的 Key 泄露不至于连累全家。
import os, timeclass KeyManager:def __init__(self, env_var: str, refresh_interval=3600):self.env_var = env_varself.refresh_interval = refresh_intervalself._last_read = 0self._cached = Nonedef get(self):now = time.time()if now - self._last_read > self.refresh_interval or self._cached is None:self._cached = os.getenv(self.env_var)self._last_read = nowif not self._cached:raise RuntimeError(f"环境变量 {self.env_var} 未设置")return self._cachedsecret_key_manager = KeyManager("JWT_SECRET_KEY", refresh_interval=7200)
💡 最后啰嗦两句
安全是持续动作,不是一次性配置。还要给 Agent 项目加上告警通知、漏洞扫描等,并且永远假设自己的代码会被人恶意利用——只有保持这种“被害妄想”,才能真正安稳。
如果你也有 Agent 正打算上线,别等到报警了再动手。拿这几段代码去,先堵上最急的窟窿,然后慢慢加固其他部分。
👋 我是那个爱折腾一名程序媛,如果这篇帮你省了一次半夜救火,点个「收藏」+「关注」让我知道呀~
也欢迎把文章转发给身边同样在折腾 Agent 的朋友,安全这回事,越多人清醒,我们就越省心。
