调用 OpenAI/Claude API 总是 429 报错?我用这套重试+限流策略彻底解决
最近做一个 AI 内容审核的项目,需要批量调用 GPT-4o 和 Claude 的 API。本来以为加个 for 循环就完事了,结果脚本跑了不到 10 分钟就开始疯狂报 429,整个进程直接趴窝。折腾了三天,终于把这套重试 + 限流方案磨出来了,记录一下。
如果你也遇到 openai.RateLimitError: Error code: 429 或者 anthropic.RateLimitError,这篇文章给出从被动重试到主动限流再到 Batch 兜底的完整解决路径,代码全部可直接复用。
429 报错到底是什么
429 是 HTTP 状态码 "Too Many Requests",简单说就是你的调用太快太多了,被服务端限流。OpenAI/Claude 的限流维度通常有这几个:
- RPM(Requests Per Minute):每分钟请求数
- TPM(Tokens Per Minute):每分钟 token 数
- RPD(Requests Per Day):每天请求总数
这三个维度任何一个超了都会触发 429。新注册账号的限额特别低,比如 GPT-4o 默认就 500 RPM / 30000 TPM,写个简单的并发就能打爆。
我最早踩的坑
刚开始我是这么写的:
import openaiclient = openai.OpenAI(api_key="sk-xxx")results = []
for prompt in prompts: # 大概 200 条response = client.chat.completions.create(model="gpt-4o",messages=[{"role": "user", "content": prompt}])results.append(response.choices[0].message.content)
这代码跑了 30 多次后开始报错,而且报得很迷惑——前面好好的,突然就 429,再过一会又恢复了。我一开始以为是网络问题,加了个简单的 try/except 循环重试,结果还是不稳定。
后来读了一下 OpenAI 文档才发现,429 的 response header 里其实带了 retry-after 字段,告诉你该等多少秒再重试。傻瓜式的循环重试根本没用这个信息,纯属浪费配额。
方案一:tenacity 指数退避
最简单可用的方案是用 tenacity 这个库做指数退避。pip 装一下:
pip install tenacity
然后包一下你的 API 调用:
import openai
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_typeclient = openai.OpenAI(api_key="sk-xxx")@retry(wait=wait_exponential(multiplier=1, min=4, max=60),stop=stop_after_attempt(6),retry=retry_if_exception_type(openai.RateLimitError),reraise=True,
)
def call_with_retry(prompt: str) -> str:response = client.chat.completions.create(model="gpt-4o",messages=[{"role": "user", "content": prompt}],)return response.choices[0].message.content
wait_exponential 的逻辑是:第一次失败等 4 秒,第二次 8 秒,第三次 16 秒……指数增长但不超过 60 秒。6 次还失败就抛出去。
这套配置对付偶发 429 基本够用,但有个问题——它是被动等限流再重试,浪费时间。如果你能预先知道自己的速率上限,主动限流更优雅。
方案二:aiolimiter 主动限流
aiolimiter 可以严格控制每分钟请求数,配合异步并发使用:
pip install aiolimiter
import asyncio
import openai
from aiolimiter import AsyncLimiterclient = openai.AsyncOpenAI(api_key="sk-xxx")
limiter = AsyncLimiter(max_rate=400, time_period=60) # 每分钟 400 次,留点余量async def call_one(prompt: str) -> str:async with limiter:response = await client.chat.completions.create(model="gpt-4o",messages=[{"role": "user", "content": prompt}],)return response.choices[0].message.contentasync def main(prompts: list[str]):tasks = [call_one(p) for p in prompts]return await asyncio.gather(*tasks, return_exceptions=True)
AsyncLimiter(400, 60) 表示每 60 秒最多 400 次。这比单纯重试好处大:
- 主动控速,不会触发服务端 429
- 异步并发,吞吐量比同步循环高得多
- 限流逻辑和业务逻辑解耦
我把 200 条 prompt 用这套跑下来,全程零 429,总耗时从原来的 25 分钟降到 8 分钟。
用聚合平台省事
如果你不想自己分别去各家申请 Key、维护多个 SDK、处理不同供应商的限流策略,可以考虑用聚合平台。
我自己现在主力在用 ofox.ai 这个聚合平台,一个 API Key 就能调 GPT-4o、Claude Opus 4.7、Gemini 3、DeepSeek 等 50+ 模型,兼容 OpenAI SDK 协议,低延迟直连,支持支付宝按量计费。比较实用的一点是多供应商冗余备份,某一路被限流时它自己会切换到备用路线,对我这种跑批量任务的场景挺友好。
接入方式还是 OpenAI SDK,只改一下 base_url:
import openaiclient = openai.AsyncOpenAI(base_url="https://api.ofox.ai/v1", # 我用的这个,低延迟直连api_key="sk-xxx",
)
方案三:TPM 维度的限流
光控 RPM 还不够,长 prompt 会把 TPM 打爆。这时候要按 token 数限流:
import tiktoken
from aiolimiter import AsyncLimiterenc = tiktoken.encoding_for_model("gpt-4o")
tpm_limiter = AsyncLimiter(max_rate=28000, time_period=60) # 留 2000 缓冲async def call_with_token_limit(prompt: str):token_count = len(enc.encode(prompt)) + 1000 # 加上预估输出# 按 token 数批量占用配额槽位for _ in range(max(1, token_count // 100)):await tpm_limiter.acquire()response = await client.chat.completions.create(model="gpt-4o",messages=[{"role": "user", "content": prompt}],)return response.choices[0].message.content
这里用了一个小技巧:把 limiter 的 max_rate 设成 TPM 上限除以 100,每次请求按预估 token 数除以 100 多占用几次配额。粗暴但有效。
综合方案:限流 + 重试双保险
实际项目里我两个都用,限流为主,重试兜底:
import openai
from aiolimiter import AsyncLimiter
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_typeclient = openai.AsyncOpenAI(base_url="https://api.ofox.ai/v1",api_key="sk-xxx",
)
rpm_limiter = AsyncLimiter(max_rate=400, time_period=60)@retry(wait=wait_exponential(multiplier=2, min=4, max=120),stop=stop_after_attempt(5),retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),reraise=True,
)
async def safe_call(prompt: str) -> str:async with rpm_limiter:response = await client.chat.completions.create(model="gpt-4o",messages=[{"role": "user", "content": prompt}],timeout=30,)return response.choices[0].message.content
这套上线一个月没再出过 429 报错。
几个容易忽略的坑
坑 1:streaming 响应也算 TPM。我以前以为 streaming 是分块返回所以不计入限流,结果发现 OpenAI 是按完整输出 token 算的,跟非 streaming 一样。
坑 2:异常重试要区分类型。RateLimitError 该重试,AuthenticationError(401)重试没用,反而可能把 Key 锁掉。一定要用 retry_if_exception_type 精确指定。
坑 3:批量场景用 Batch API 更划算。如果你不急着拿结果,OpenAI 和 Anthropic 都有 Batch API,24 小时内返回,价格是实时的 50%,而且不占 RPM/TPM 配额。这个超级香,我后来把审核任务全切到 Batch 了。
坑 4:监控不能省。我吃过一次大亏——某天凌晨脚本疯狂 429,因为账号被风控了 RPM 临时砍半,但我没监控。后来加了个简单的报错率统计,每 100 次调用算一次成功率,掉到 80% 以下就发飞书告警,再没出过事。
总结
429 报错的处理思路总结成一句话:限流为主,重试兜底,监控保命。
- 偶发 429:tenacity 指数退避
- 高并发场景:aiolimiter 主动限流
- 长 prompt:按 token 维度限流
- 批量非实时任务:直接上 Batch API
代码我都贴上了,复制走改改参数就能用。希望能帮到同样被 429 折磨的同学。
