Spring Boot 3.5 + MyBatis Plus + RabbitMQ:打造 AI 驱动的慢 SQL 监控与优化系统
一套面向高并发场景的智能性能诊断方案,自动捕获慢请求与慢 SQL,通过 RabbitMQ 削峰异步处理,调用大模型生成优化建议并持久化,实现“从发现到建议”全自动 SQL 治理闭环。
1. 为什么需要这套系统?
人工看慢日志太累:传统 MySQL 慢查询日志或 Druid 监控页面需要 DBA 逐条分析,时效性差。
接口慢 ≠ 一定是 SQL 慢:可能包含了业务逻辑耗时,需同时监控接口总耗时和单条 SQL 耗时才能精准定位。
AI 可直接给出改写方案:结合 SQL 模板、执行耗时、接口上下文,大模型能快速给出加索引、改写法等具体建议,降低人工门槛。
于是我们构建了一个无侵入、异步、可扩展的 AI 慢 SQL 分析系统,核心流程如下:
用户请求 → Filter 计时开始 → 业务 SQL 执行(Interceptor 记录 SQL 耗时)
→ Filter 计时结束 → 总耗时超过阈值?
→ 是 → 收集 SQL 耗时信息 → 封装事件 → 发送 RabbitMQ
→ 消费者消费事件 → 写入慢日志 + 调用 AI 分析每条 SQL → 入库待审核
2. 技术选型
| 组件 | 作用 |
|---|---|
| Spring Boot 3.5.12 | 基础框架,提供 Web、AMQP、JDBC 自动配置 |
| MyBatis Plus 3.5.5 | ORM 层,简化开发,提供插件机制 |
| MySQL | 业务数据 + 监控结果存储 |
| RabbitMQ | 异步解耦,削峰填谷 |
| Jackson | 消息 JSON 序列化 |
| RestTemplate | 调用 OpenAI/DeepSeek 等兼容接口 |
| Druid | 连接池,可选作为 SQL 监控的兜底数据源 |
3. 核心实现细节
3.1 请求级计时:RequestTimingFilter
所有 API 请求都经过该 Filter,记录开始时间,生成 TraceId,并在请求结束后判断是否慢请求。
@Component public class RequestTimingFilter implements Filter { @Value("${slow.request-threshold:1000}") private long slowThreshold; // 总耗时阈值,配置化 @Autowired private SlowRequestProducer producer; @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest httpRequest = (HttpServletRequest) request; long start = System.currentTimeMillis(); String traceId = UUID.randomUUID().toString().replace("-", "").substring(0, 16); httpRequest.setAttribute("startTime", start); httpRequest.setAttribute("traceId", traceId); try { chain.doFilter(request, response); } finally { long totalTime = System.currentTimeMillis() - start; if (totalTime > slowThreshold) { // 从线程变量中取出该请求内所有 SQL 的耗时 ConcurrentHashMap<String, Long> sqlTimes = SqlTimingInterceptor.getCurrentSqlTimes(); // 封装事件并发送 RabbitMQ(代码见下文) SlowRequestEvent event = buildEvent(httpRequest, totalTime, traceId, sqlTimes); producer.sendSlowRequestEvent(event); } SqlTimingInterceptor.clear(); // 防止内存泄漏 } } }3.2 SQL 执行耗时拦截:SqlTimingInterceptor
利用 MyBatis 插件机制拦截StatementHandler.prepare()方法,记录每条 SQL 的执行耗时。
@Component @Intercepts(@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})) public class SqlTimingInterceptor implements Interceptor { private static final ThreadLocal<ConcurrentHashMap<String, Long>> SQL_TIMES = new ThreadLocal<>(); @Override public Object intercept(Invocation invocation) throws Throwable { long start = System.nanoTime(); Object result = invocation.proceed(); long execMs = (System.nanoTime() - start) / 1_000_000; StatementHandler handler = (StatementHandler) invocation.getTarget(); String rawSql = handler.getBoundSql().getSql().replaceAll("\\s+", " "); // 简单脱敏:将数字和字符串替换为 ? String template = rawSql.replaceAll("\\d+", "?").replaceAll("'[^']*'", "?"); ConcurrentHashMap<String, Long> map = SQL_TIMES.get(); if (map == null) { map = new ConcurrentHashMap<>(); SQL_TIMES.set(map); } map.merge(template, execMs, Math::max); // 同模板保留最大耗时 return result; } public static ConcurrentHashMap<String, Long> getCurrentSqlTimes() { return SQL_TIMES.get(); } public static void clear() { SQL_TIMES.remove(); } }注册拦截器:为避免 Spring 自动装配时机问题,使用ConfigurationCustomizer确保拦截器一定会被添加到Configuration中
@Configuration public class MyBatisPlusConfig { @Bean public ConfigurationCustomizer mybatisConfigurationCustomizer() { return configuration -> configuration.addInterceptor(new SqlTimingInterceptor()); } }3.3 RabbitMQ 配置:统一 JSON 序列化
自定义RabbitTemplate和消费者容器工厂,全部使用Jackson2JsonMessageConverter,避免 Java 原生序列化带来的NotSerializableException。
@Configuration public class RabbitMqConfig { public static final String EXCHANGE = "slow.request.exchange"; public static final String QUEUE = "slow.sql.queue"; public static final String ROUTING_KEY = "slow.sql"; // 声明交换机、队列、绑定 ... @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory cf) { RabbitTemplate template = new RabbitTemplate(cf); template.setMessageConverter(converter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory cf) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cf); factory.setMessageConverter(converter()); return factory; } }3.4 消费者:持久化 + AI 分析
@Component public class SlowRequestConsumer { @Value("${slow.sql-threshold:500}") private long sqlThreshold; // 单条 SQL 的最低分析阈值,配置化 @Autowired private SlowRequestLogMapper logMapper; @Autowired private OptimizationAnalysisMapper analysisMapper; @Autowired private AiAnalysisService aiAnalysisService; @RabbitListener(queues = RabbitMqConfig.QUEUE) public void handle(SlowRequestEvent event) { // 1. 保存慢请求日志 SlowRequestLog log = buildLog(event); logMapper.insert(log); // 2. 对每条 SQL 进行分析 if (event.getSqlList() != null) { for (SqlExecInfo sql : event.getSqlList()) { if (sql.getExecTimeMs() < sqlThreshold) continue; // 忽略耗时过短的单条 SQL String aiResult = aiAnalysisService.analyzeSql( sql.getTemplate(), sql.getExecTimeMs(), event.getUri() ); // 保存分析结果(含危险操作标记) OptimizationAnalysis analysis = buildAnalysis(log.getId(), sql, aiResult); analysisMapper.insert(analysis); } } } }3.5 AI 调用服务
@Service public class AiAnalysisService { @Value("${ai.provider.url}") private String aiUrl; @Value("${ai.provider.api-key}") private String apiKey; @Value("${ai.provider.model}") private String model; private final RestTemplate restTemplate = new RestTemplate(); public String analyzeSql(String sql, long execTimeMs, String apiPath) { String prompt = String.format(""" 接口: %s SQL: %s 执行耗时: %dms 请以JSON格式分析原因并给出优化建议,包含 risk 字段。 """, apiPath, sql, execTimeMs); // 构建 OpenAI 兼容请求 Map<String, Object> body = Map.of( "model", model, "messages", List.of( Map.of("role", "system", "content", "你是MySQL优化专家,返回JSON。"), Map.of("role", "user", "content", prompt) ), "temperature", 0.2 ); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setBearerAuth(apiKey); ResponseEntity<String> resp = restTemplate.postForEntity( aiUrl, new HttpEntity<>(body, headers), String.class ); return resp.getBody(); // 实际需提取 choices[0].message.content } }4. 测试与验证
4.1 数据库准备
执行init.sql创建student、teacher、slow_request_log、optimization_analysis表。
测试时若student表只有几条数据,查询耗时极短,无法触发阈值。可采用以下方式模拟:
方案一:调低阈值(推荐快速测试)
application.yml中设置:
slow: request-threshold: 10 # 10ms,任意查询都会被视为慢请求 sql-threshold: 1方案二:插入大量数据
通过存储过程插入 50 万条学生数据,使全表扫描超过 1 秒。
方案三:在 Controller 中手动延时
@GetMapping("/slow-students") public List<Student> slowStudents() throws InterruptedException { List<Student> list = studentMapper.selectList(...); Thread.sleep(1500); // 制造总耗时 > 1s return list; }4.2 验证结果
访问
http://localhost:8080/api/test/slow-students。查看 RabbitMQ 管理界面,队列中应有消息。
查询数据库:
SELECT * FROM slow_request_log; SELECT * FROM optimization_analysis;optimization_analysis.ai_suggestion字段应包含 AI 返回的 JSON 建议。
6. 生产强化建议
SQL 指纹去重:同一 SQL 指纹在 1 小时内只分析一次,引入 Redis 记录。
AI 结果解析:提取
risk字段,将高风险操作 (如 DROP) 自动标记为危险,需人工二次确认。死信队列与重试:AI 调用失败的消息进入死信队列,后续定时补偿。
数据脱敏:消息中不携带真实参数值,全部替换为
?。前端可视化:搭建管理页面展示待优化项,支持标记 “已应用”/“忽略”。
连接池与限流:对 AI 调用使用 Resilience4j 限制并发,避免超额。
7. 总结
本文从零实现了一套基于 Spring Boot + MyBatis Plus + RabbitMQ 的 AI 慢 SQL 监控系统。核心思路是:拦截请求与 SQL 耗时 → 异步发送事件 → 持久化并调用 AI 分析 → 入库待审。整个过程对业务代码零侵入,可大幅提升性能问题发现与解决的效率,尤其适合微服务架构下 SQL 治理难、缺乏专职 DBA 的团队。
完整代码可参考文中各片段组合,关键配置已全部给出,读者可直接复用至自己项目。
扩展思考:如果团队已引入 SkyWalking / Pinpoint 等 APM 工具,本系统可作为其增强模块,专注于自动化建议生成,形成“监控 → 诊断 → 建议”的完整链路。
