FastAPI + Redis 实现接口限流:从固定窗口到滑动窗口的完整实践
前言
在后端系统开发中,接口限流是一个非常常见但容易被忽视的基础能力。
很多接口在功能测试阶段看起来没有问题,但一旦进入真实线上环境,就可能遇到以下情况:
某个用户频繁刷新页面,导致接口被大量请求;
某个爬虫短时间内高频访问接口;
某个客户端出现死循环,不断重复调用服务;
秒杀、报名、查询等高并发场景下,请求量瞬间暴涨;
第三方调用方没有做好重试控制,导致接口压力被放大。
如果系统没有限流机制,很容易出现数据库压力过大、缓存被打穿、服务响应变慢,甚至整个应用被拖垮。
本文将使用FastAPI + Redis实现一个轻量级接口限流方案,重点讲清楚以下内容:
什么是接口限流;
常见限流算法有哪些;
如何用 Redis 实现固定窗口限流;
固定窗口算法有什么问题;
如何进一步实现滑动窗口限流;
如何在 FastAPI 中封装成通用依赖;
真实项目中需要注意哪些细节。
本文不依赖复杂框架,重点放在底层逻辑和工程实现。
一、什么是接口限流?
接口限流的核心目标是:
限制某个用户、某个 IP、某个接口或某类资源在单位时间内的访问次数。
例如:
同一个 IP 每分钟最多访问登录接口 10 次 同一个用户每秒最多提交订单 3 次 同一个接口每分钟最多允许 1000 次请求当请求超过限制后,系统可以直接返回错误,例如:
{ "code": 429, "message": "Too Many Requests" }HTTP 协议中,429 Too Many Requests就是专门用于表示请求过于频繁的状态码。
二、常见限流算法
常见接口限流算法主要有以下几种。
1. 固定窗口算法
固定窗口算法是最简单的限流方式。
例如限制:
每个 IP 每 60 秒最多请求 100 次系统会以 60 秒为一个窗口,记录当前窗口内的请求次数。
如果请求次数小于 100,则允许访问。
如果请求次数超过 100,则拒绝访问。
优点是实现简单,性能好。
缺点是存在窗口边界问题。
例如:
12:00:59 请求 100 次 12:01:00 请求 100 次从每分钟限制来看,这两批请求分别落在两个窗口内,都没有超限。
但从真实时间来看,用户在 1 秒左右发送了 200 次请求。
这就会造成瞬时流量突刺。
2. 滑动窗口算法
滑动窗口算法会记录一段时间内的真实请求时间戳。
例如限制:
任意 60 秒内最多请求 100 次每次请求进来时,系统会先删除 60 秒之前的请求记录,然后统计最近 60 秒内的请求数量。
如果数量小于 100,则允许访问,并记录当前请求时间。
如果数量已经达到 100,则拒绝访问。
滑动窗口比固定窗口更精确,但实现成本和 Redis 操作成本也更高。
3. 令牌桶算法
令牌桶算法会以固定速率向桶中放入令牌。
每次请求都需要消耗一个令牌。
如果桶中有令牌,则允许请求。
如果桶中没有令牌,则拒绝请求或等待。
令牌桶适合允许一定程度的突发流量。
4. 漏桶算法
漏桶算法把请求看作进入桶中的水,请求会以固定速率流出。
如果请求进入速度太快,桶满后新请求就会被丢弃。
漏桶更强调平滑流量,适合控制下游系统的处理速率。
三、环境准备
本文使用以下技术栈:
Python 3.10+ FastAPI Redis redis-py uvicorn安装依赖:
pip install fastapi uvicorn redis如果本地没有 Redis,可以使用 Docker 快速启动:
docker run -d \ --name redis-limiter \ -p 6379:6379 \ redis:7创建项目目录:
rate_limit_demo/ ├── main.py ├── redis_client.py ├── limiter.py └── requirements.txt四、连接 Redis
先创建redis_client.py:
import redis redis_client = redis.Redis( host="localhost", port=6379, db=0, decode_responses=True )这里使用同步 Redis 客户端,方便演示。
在高并发异步项目中,也可以使用redis.asyncio。
五、实现固定窗口限流
固定窗口限流的核心思路是:
每个限流对象生成一个 Redis key 每次请求让 key 对应的计数器加 1 如果是第一次访问,设置过期时间 如果计数超过阈值,则拒绝请求例如 key 可以设计为:
rate_limit:login:127.0.0.1表示某个 IP 对登录接口的访问次数。
创建limiter.py:
import time from fastapi import HTTPException, Request from redis_client import redis_client def fixed_window_limiter( key: str, limit: int, window_seconds: int ): """ 固定窗口限流 key: 限流 key limit: 窗口内最大请求次数 window_seconds: 窗口长度,单位秒 """ current = redis_client.incr(key) if current == 1: redis_client.expire(key, window_seconds) if current > limit: raise HTTPException( status_code=429, detail="Too Many Requests" ) return True这个实现非常简单,但有一个细节需要注意:
current = redis_client.incr(key)INCR是 Redis 原子操作,因此并发场景下不会出现多个请求同时读取相同旧值的问题。
六、在 FastAPI 接口中使用固定窗口限流
创建main.py:
from fastapi import FastAPI, Request from limiter import fixed_window_limiter app = FastAPI() @app.get("/api/search") def search(request: Request): client_ip = request.client.host key = f"rate_limit:search:{client_ip}" fixed_window_limiter( key=key, limit=5, window_seconds=60 ) return { "message": "search success" }启动项目:
uvicorn main:app --reload访问接口:
curl http://127.0.0.1:8000/api/search如果同一个 IP 在 60 秒内访问超过 5 次,就会返回:
{ "detail": "Too Many Requests" }状态码为429。
七、固定窗口算法的问题
固定窗口虽然实现简单,但存在明显的边界问题。
假设限制规则是:
每分钟最多请求 5 次用户在第 59 秒请求 5 次,然后在第 60 秒之后马上又请求 5 次。
从 Redis 计数窗口来看,这 10 次请求分别属于两个周期,因此都可能被允许。
但从实际情况看,用户在极短时间内完成了 10 次请求,仍然可能对服务造成压力。
这就是固定窗口算法的典型缺陷。
为了解决这个问题,可以使用滑动窗口算法。
八、用 Redis ZSET 实现滑动窗口限流
滑动窗口需要记录每次请求的时间戳。
Redis 的有序集合 ZSET 很适合这个场景。
可以把请求时间戳作为 score,把请求唯一标识作为 member。
每次请求时,执行以下步骤:
1. 删除窗口之前的请求记录 2. 统计当前窗口内的请求数量 3. 如果数量达到限制,则拒绝请求 4. 如果没有达到限制,则写入当前请求记录 5. 设置 key 过期时间例如限制:
任意 60 秒内最多请求 5 次当前时间是now,则需要保留:
now - 60 秒 到 now这段时间内的请求记录。
代码如下:
import time import uuid from fastapi import HTTPException from redis_client import redis_client def sliding_window_limiter( key: str, limit: int, window_seconds: int ): """ 滑动窗口限流 key: 限流 key limit: 窗口内最大请求次数 window_seconds: 窗口长度,单位秒 """ now = time.time() window_start = now - window_seconds redis_client.zremrangebyscore(key, 0, window_start) current_count = redis_client.zcard(key) if current_count >= limit: raise HTTPException( status_code=429, detail="Too Many Requests" ) member = f"{now}:{uuid.uuid4()}" redis_client.zadd(key, {member: now}) redis_client.expire(key, window_seconds) return True这里使用了几个 Redis 命令:
ZREMRANGEBYSCORE:删除指定分数范围内的元素 ZCARD:统计集合中元素数量 ZADD:添加元素 EXPIRE:设置过期时间每一次请求都会清理过期数据,然后检查最近窗口内的请求数量。
九、滑动窗口的并发问题
上面的滑动窗口实现逻辑是正确的,但在极高并发场景下仍然存在一个问题:
删除旧数据 统计数量 写入新请求这三个步骤不是一个原子操作。
如果多个请求同时进来,可能都在ZCARD时看到未超限,然后一起写入,导致短时间内超出限制。
为了让操作原子化,可以使用 Redis Lua 脚本。
十、使用 Lua 脚本保证原子性
Redis 执行 Lua 脚本时,会把脚本作为一个整体执行。
在脚本执行期间,不会被其他命令插入。
因此适合处理限流这种多步骤操作。
Lua 脚本如下:
SLIDING_WINDOW_LUA = """ local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local member = ARGV[4] local window_start = now - window redis.call('ZREMRANGEBYSCORE', key, 0, window_start) local current = redis.call('ZCARD', key) if current >= limit then return 0 end redis.call('ZADD', key, now, member) redis.call('EXPIRE', key, window) return 1 """封装成 Python 函数:
import time import uuid from fastapi import HTTPException from redis_client import redis_client SLIDING_WINDOW_LUA = """ local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local member = ARGV[4] local window_start = now - window redis.call('ZREMRANGEBYSCORE', key, 0, window_start) local current = redis.call('ZCARD', key) if current >= limit then return 0 end redis.call('ZADD', key, now, member) redis.call('EXPIRE', key, window) return 1 """ def sliding_window_limiter_lua( key: str, limit: int, window_seconds: int ): now = time.time() member = f"{now}:{uuid.uuid4()}" allowed = redis_client.eval( SLIDING_WINDOW_LUA, 1, key, now, window_seconds, limit, member ) if allowed != 1: raise HTTPException( status_code=429, detail="Too Many Requests" ) return True这样,清理、统计、判断、写入就变成了一个原子操作。
十一、封装成 FastAPI 依赖
在实际项目中,不建议每个接口都手动写限流逻辑。
可以将限流器封装成 FastAPI 依赖。
from fastapi import Request def rate_limit( limit: int, window_seconds: int, prefix: str ): def dependency(request: Request): client_ip = request.client.host path = request.url.path key = f"rate_limit:{prefix}:{path}:{client_ip}" sliding_window_limiter_lua( key=key, limit=limit, window_seconds=window_seconds ) return True return dependency使用方式如下:
from fastapi import FastAPI, Depends app = FastAPI() @app.get("/api/search", dependencies=[Depends(rate_limit( limit=5, window_seconds=60, prefix="search" ))]) def search(): return { "message": "search success" } @app.post("/api/login", dependencies=[Depends(rate_limit( limit=3, window_seconds=60, prefix="login" ))]) def login(): return { "message": "login success" }这样不同接口就可以设置不同的限流规则。
例如:
搜索接口:每分钟 5 次 登录接口:每分钟 3 次 提交接口:每分钟 10 次十二、按用户 ID 限流
上面的代码是按 IP 限流。
但在登录后的业务系统中,更常见的是按用户 ID 限流。
例如:
同一个用户每分钟最多提交 10 次订单可以从请求头或认证信息中获取用户 ID。
示例:
def user_rate_limit( limit: int, window_seconds: int, prefix: str ): def dependency(request: Request): user_id = request.headers.get("X-User-Id") if not user_id: raise HTTPException( status_code=401, detail="Unauthorized" ) path = request.url.path key = f"rate_limit:{prefix}:{path}:user:{user_id}" sliding_window_limiter_lua( key=key, limit=limit, window_seconds=window_seconds ) return True return dependency接口中使用:
@app.post("/api/order", dependencies=[Depends(user_rate_limit( limit=10, window_seconds=60, prefix="order" ))]) def create_order(): return { "message": "order created" }需要注意的是,真实项目中用户 ID 不应该直接相信请求头,而应该从认证系统、JWT、Session 或网关传递的可信身份中获取。
十三、返回更友好的错误信息
默认返回:
{ "detail": "Too Many Requests" }对于前端来说可能不够友好。
可以统一返回结构:
from fastapi import HTTPException def raise_rate_limit_error(): raise HTTPException( status_code=429, detail={ "code": "RATE_LIMITED", "message": "请求过于频繁,请稍后再试" } )在限流失败时调用:
if allowed != 1: raise_rate_limit_error()这样前端可以根据code做统一处理。
十四、增加 Retry-After 响应头
HTTP 中可以使用Retry-After告诉客户端多久后可以重试。
示例:
from fastapi import HTTPException def raise_rate_limit_error(retry_after: int): raise HTTPException( status_code=429, detail={ "code": "RATE_LIMITED", "message": "请求过于频繁,请稍后再试" }, headers={ "Retry-After": str(retry_after) } )例如:
Retry-After: 60表示客户端可以在 60 秒后重试。
在更精细的滑动窗口实现中,可以根据最早请求时间计算剩余等待时间。
十五、限流 Key 的设计
限流效果很大程度上取决于 key 的设计。
常见 key 维度包括:
按 IP 限流: rate_limit:ip:{ip} 按用户限流: rate_limit:user:{user_id} 按接口限流: rate_limit:path:{path} 按用户 + 接口限流: rate_limit:user:{user_id}:path:{path} 按 IP + 接口限流: rate_limit:ip:{ip}:path:{path}不同场景适合不同粒度。
登录接口
适合按 IP + 账号共同限流:
rate_limit:login:ip:{ip} rate_limit:login:account:{account}这样可以防止同一个 IP 爆破多个账号,也可以防止多个 IP 爆破同一个账号。
下单接口
适合按用户 ID 限流:
rate_limit:order:user:{user_id}公开查询接口
适合按 IP 限流:
rate_limit:query:ip:{ip}第三方开放接口
适合按 API Key 限流:
rate_limit:openapi:apikey:{api_key}十六、真实项目中的注意事项
1. 不要只在应用层限流
应用层限流很灵活,但在高流量场景下,请求已经打到了应用服务。
如果流量特别大,应该在更靠前的位置限流,例如:
网关层 负载均衡层 Nginx API Gateway 服务网格应用层限流更适合做精细化业务控制。
2. 注意代理场景下的真实 IP
如果服务部署在反向代理后面,直接使用:
request.client.host拿到的可能是代理服务器 IP,而不是真实客户端 IP。
这种情况下需要读取:
X-Forwarded-For X-Real-IP但这些请求头也可能被伪造,因此必须由可信网关统一注入和清洗。
3. Redis 故障时如何处理
限流依赖 Redis。
如果 Redis 不可用,系统需要决定采用哪种策略:
fail-open:Redis 故障时放行请求 fail-closed:Redis 故障时拒绝请求大多数普通业务接口更适合 fail-open,避免因为 Redis 故障导致主业务全部不可用。
但对于登录、支付、风控等敏感接口,可以根据业务风险选择更严格策略。
4. 避免 Redis key 爆炸
如果 key 维度过细,例如:
用户 ID + IP + 接口 + 设备 + 参数可能会产生大量 Redis key。
因此 key 设计需要平衡精度和资源消耗。
建议:
设置合理过期时间;
避免把长参数直接拼进 key;
必要时对参数做 hash;
定期监控 Redis key 数量和内存占用。
5. 限流不是权限控制
限流只能限制访问频率,不能替代认证和权限校验。
例如:
限流解决的是“请求太多”的问题 认证解决的是“你是谁”的问题 权限解决的是“你能不能访问”的问题三者不能混为一谈。
十七、完整示例代码
下面给出一个简化版完整示例。
import time import uuid import redis from fastapi import FastAPI, Request, Depends, HTTPException redis_client = redis.Redis( host="localhost", port=6379, db=0, decode_responses=True ) SLIDING_WINDOW_LUA = """ local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) local member = ARGV[4] local window_start = now - window redis.call('ZREMRANGEBYSCORE', key, 0, window_start) local current = redis.call('ZCARD', key) if current >= limit then return 0 end redis.call('ZADD', key, now, member) redis.call('EXPIRE', key, window) return 1 """ def raise_rate_limit_error(): raise HTTPException( status_code=429, detail={ "code": "RATE_LIMITED", "message": "请求过于频繁,请稍后再试" } ) def sliding_window_limiter_lua( key: str, limit: int, window_seconds: int ): now = time.time() member = f"{now}:{uuid.uuid4()}" allowed = redis_client.eval( SLIDING_WINDOW_LUA, 1, key, now, window_seconds, limit, member ) if allowed != 1: raise_rate_limit_error() return True def rate_limit( limit: int, window_seconds: int, prefix: str ): def dependency(request: Request): client_ip = request.client.host path = request.url.path key = f"rate_limit:{prefix}:{path}:{client_ip}" sliding_window_limiter_lua( key=key, limit=limit, window_seconds=window_seconds ) return True return dependency app = FastAPI() @app.get("/api/search", dependencies=[Depends(rate_limit( limit=5, window_seconds=60, prefix="search" ))]) def search(): return { "message": "search success" } @app.post("/api/submit", dependencies=[Depends(rate_limit( limit=3, window_seconds=60, prefix="submit" ))]) def submit(): return { "message": "submit success" }启动服务:
uvicorn main:app --reload测试接口:
curl http://127.0.0.1:8000/api/search连续请求超过限制后,会返回 429。
十八、总结
本文从零实现了一个基于 FastAPI 和 Redis 的接口限流方案。
核心流程可以概括为:
请求进入接口 ↓ 根据 IP、用户 ID 或 API Key 生成限流 key ↓ 在 Redis 中记录请求时间或请求次数 ↓ 判断是否超过限制 ↓ 允许请求或返回 429固定窗口算法实现简单,适合基础场景,但存在窗口边界问题。
滑动窗口算法更加精确,可以限制“任意时间段内”的请求次数,更适合对流量控制要求较高的接口。
在工程实践中,限流方案需要结合业务场景选择合适粒度:
公开接口可以按 IP 限流;
用户接口可以按用户 ID 限流;
登录接口可以按 IP 和账号共同限流;
开放平台接口可以按 API Key 限流;
高并发核心接口可以放在网关层提前限流。
接口限流不是一个复杂功能,但它是后端系统稳定性建设中非常重要的一环。
一个好的限流设计,既能保护系统不被异常流量拖垮,也能让正常用户在高峰期获得更稳定的访问体验。
