霸王餐CPS系统中Java实现接口限流的多种算法与落地技巧
霸王餐CPS系统中Java实现接口限流的多种算法与落地技巧
在“霸王餐”CPS系统中,第三方回调、佣金查询、活动配置等接口常面临恶意刷量或突发流量冲击。若无有效限流机制,将导致数据库压力激增、服务雪崩。本文结合计数器、滑动窗口、漏桶、令牌桶四种算法,提供基于Guava、Redis及Sentinel的可落地限流方案。
1. 固定窗口计数器(简单但存在临界突刺)
适用于低精度场景,如每日回调次数限制:
packagebaodanbao.com.cn.cps.ratelimit;importcom.google.common.cache.CacheBuilder;importcom.google.common.cache.CacheLoader;importcom.google.common.cache.LoadingCache;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicInteger;publicclassFixedWindowRateLimiter{privatefinalLoadingCache<String,AtomicInteger>counters;privatefinalintlimit;privatefinallongwindowSeconds;publicFixedWindowRateLimiter(intlimit,longwindowSeconds){this.limit=limit;this.windowSeconds=windowSeconds;this.counters=CacheBuilder.newBuilder().expireAfterWrite(windowSeconds,TimeUnit.SECONDS).build(newCacheLoader<>(){@OverridepublicAtomicIntegerload(Stringkey){returnnewAtomicInteger(0);}});}publicbooleantryAcquire(Stringkey){AtomicIntegercounter=counters.getUnchecked(key);intcurrent=counter.incrementAndGet();returncurrent<=limit;}}使用示例:
FixedWindowRateLimiterlimiter=newFixedWindowRateLimiter(100,60);// 每分钟100次if(!limiter.tryAcquire("callback:"+merchantId)){thrownewRuntimeException("Too many requests");}2. 滑动日志窗口(高精度但内存开销大)
记录每次请求时间戳,动态计算窗口内请求数:
publicclassSlidingLogRateLimiter{privatefinalLoadingCache<String,List<Long>>requestLogs;privatefinalintlimit;privatefinallongwindowMillis;publicSlidingLogRateLimiter(intlimit,longwindowSeconds){this.limit=limit;this.windowMillis=windowSeconds*1000;this.requestLogs=CacheBuilder.newBuilder().expireAfterAccess(windowSeconds,TimeUnit.SECONDS).build(newCacheLoader<>(){@OverridepublicList<Long>load(Stringkey){returnnewArrayList<>();}});}publicsynchronizedbooleantryAcquire(Stringkey){longnow=System.currentTimeMillis();List<Long>logs=requestLogs.getUnchecked(key);// 清除窗口外记录logs.removeIf(ts->now-ts>windowMillis);if(logs.size()>=limit){returnfalse;}logs.add(now);returntrue;}}3. 令牌桶算法(支持突发流量)
使用GuavaRateLimiter实现单机限流:
@ComponentpublicclassTokenBucketRateLimiter{privatefinalMap<String,com.google.common.util.concurrent.RateLimiter>limiters=newConcurrentHashMap<>();publicRateLimitergetOrCreate(Stringkey,doublepermitsPerSecond){returnlimiters.computeIfAbsent(key,k->com.google.common.util.concurrent.RateLimiter.create(permitsPerSecond));}publicbooleantryAcquire(Stringkey,doublepermitsPerSecond){returngetOrCreate(key,permitsPerSecond).tryAcquire();}}// Controller中使用@AutowiredprivateTokenBucketRateLimitertokenLimiter;@PostMapping("/commission/query")publicObjectqueryCommission(@RequestBodyQueryDTOdto){if(!tokenLimiter.tryAcquire("query:"+dto.getUserId(),10.0)){thrownewRuntimeException("Request too fast");}// 业务逻辑}4. Redis+Lua实现分布式令牌桶
适用于集群环境,保证全局一致性:
@ComponentpublicclassDistributedTokenBucket{privatefinalStringRedisTemplateredisTemplate;publicDistributedTokenBucket(StringRedisTemplateredisTemplate){this.redisTemplate=redisTemplate;}privatestaticfinalStringLUA_SCRIPT="local key = KEYS[1]\n"+"local rate = tonumber(ARGV[1])\n"+"local capacity = tonumber(ARGV[2])\n"+"local now = tonumber(ARGV[3])\n"+"local requested = tonumber(ARGV[4])\n"+"local tokens = redis.call('GET', key)\n"+"if tokens == false then\n"+" tokens = capacity\n"+"else\n"+" local last_time = redis.call('GET', key .. ':ts')\n"+" if last_time == false then last_time = now end\n"+" tokens = math.min(capacity, tonumber(tokens) + (now - tonumber(last_time)) * rate)\n"+"end\n"+"if tokens >= requested then\n"+" redis.call('SET', key, tokens - requested)\n"+" redis.call('SET', key .. ':ts', now)\n"+" return 1\n"+"else\n"+" return 0\n"+"end";publicbooleantryAcquire(Stringkey,doublerate,intcapacity,intpermits){DefaultRedisScript<Long>script=newDefaultRedisScript<>(LUA_SCRIPT,Long.class);Longresult=redisTemplate.execute(script,Collections.singletonList("tb:"+key),String.valueOf(rate),String.valueOf(capacity),String.valueOf(System.currentTimeMillis()),String.valueOf(permits));returnresult!=null&&result==1;}}调用示例:
booleanallowed=distributedTokenBucket.tryAcquire("callback:meituan",50.0,// 每秒50个令牌100,// 桶容量1001// 每次请求1个令牌);if(!allowed)thrownewRuntimeException("Rate limited");5. Sentinel集成(生产级方案)
通过注解实现细粒度限流:
@PostConstructpublicvoidinitRules(){List<FlowRule>rules=newArrayList<>();FlowRulerule=newFlowRule("commission_callback_api").setGrade(RuleConstant.FLOW_GRADE_QPS).setCount(200)// 单机QPS上限200.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)// 匀速排队.setMaxQueueingTimeMs(500);rules.add(rule);FlowRuleManager.loadRules(rules);}@SentinelResource(value="commission_callback_api",blockHandler="handleBlocked")@PostMapping("/callback")publicResponseEntity<?>handleCallback(@RequestBodyCallbackDTOdto){returnbaodanbao.com.cn.cps.service.CallbackService.process(dto);}publicResponseEntity<?>handleBlocked(BlockExceptionex){returnResponseEntity.status(429).body("Too many requests");}本文著作权归 俱美开放平台 ,转载请注明出处!
