Java 程序员第 24 阶段:多 Agent 高阶实战,复杂业务场景完整落地实现
在多 Agent 基础篇中,我们探讨了角色协同、任务拆分的基本模式。本文进一步深入,聚焦高阶架构设计、跨服务协作与复杂场景完整落地,帮助读者构建生产级别的多 Agent 系统。
一、高阶架构:从简单协同到生产级系统
1.1 三层架构模型
成熟的多 Agent 系统通常采用调度层、执行层、专家层三层分离架构:
``__INLINE_
┌─────────────────────────────────────────────────────┐
│ 调度层 (Dispatcher) │
│ 任务解析 → DAG 拆分 → 结果汇总 → 响应输出 │
└─────────────────────────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 执行Agent │ │ 执行Agent │ │ 执行Agent │
│ (Executor) │ │ (Executor) │ │ (Executor) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────┼──────────────────┘
▼
┌─────────────────────────────────────────────────────┐
│ 专家层 (Specialist) │
│ 风控专家 · 法律专家 · 技术专家 │
└─────────────────────────────────────────────────────┘
__`__INLINE_
调度层负责任务编排,不参与具体业务逻辑;执行层并行处理独立子任务;专家层提供垂直领域的深度能力,可被执行层按需调用。
1.2 调度策略分类
策略 | 适用场景 | 核心优势 |
**同步调度** | 强依赖任务链 | 结果实时可用的简单场景 |
**异步调度** | I/O 密集、耗时任务 | 不阻塞主流程,提升吞吐量 |
**分层调度** | 多阶段复杂流程 | 清晰分层,易于维护 |
**混合调度** | 生产环境综合场景 | 兼顾性能与可靠性 |
二、任务编排:复杂业务场景的拆分艺术
2.1 DAG 依赖管理
复杂任务存在天然的数据依赖关系,使用有向无环图(DAG)管理任务依赖:
__`__INLINE_java
@Service
public class DagScheduler {
public List<Level> buildDag(List<SubTask> tasks) {
// 识别无依赖任务(Level 0)
List<SubTask> level0 = tasks.stream()
.filter(t -> t.getDependencies().isEmpty())
.collect(Collectors.toList());
// 按层级推进
Map<Integer, List<SubTask>> levels = new HashMap<>();
levels.put(0, level0);
for (int i = 1; ; i++) {
List<SubTask> levelN = tasks.stream()
.filter(t -> t.getDependencies().stream()
.allMatch(d -> levels.get(i-1).contains(d)))
.filter(t -> !levels.containsValue(t))
.collect(Collectors.toList());
if (levelN.isEmpty()) break;
levels.put(i, levelN);
}
return levels.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(e -> new Level(e.getKey(), e.getValue()))
.collect(Collectors.toList());
}
}
__`__INLINE_
2.2 动态任务编排
根据输入特征动态决定执行路径:
__`__INLINE_java
public TaskPlan plan(UserRequest request) {
TaskPlan plan = new TaskPlan();
if (request.isHighRisk()) {
// 高风险场景:增加风控环节
plan.addStage(TaskStage.RISK_EVALUATION);
plan.addStage(TaskStage.MANUAL_REVIEW);
}
if (request.getAmount().compareTo(HIGH_AMOUNT) > 0) {
// 大额交易:多维度复核
plan.addParallelTask("income_verify");
plan.addParallelTask("asset_verify");
}
plan.addStage(TaskStage.FINAL_APPROVAL);
return plan;
}
__`__INLINE_
三、跨服务协作:分布式多 Agent 实战
3.1 服务边界与 Agent 分布
在微服务架构下,每个服务可部署独立 Agent,形成联邦式协作:
__`__INLINE_
[用户服务] ─────┐
Agent │ 消息队列
[订单服务] ─────┼─── Kafka ──→ [聚合服务]
Agent │ Dispatcher
[支付服务] ─────┘
Agent
__`__INLINE_
3.2 消息总线实现
使用消息队列实现跨服务解耦:
__`__INLINE_java
// 任务发布
@Service
public class TaskPublisher {
@Autowired private KafkaTemplate<String, TaskMessage> kafka;
public void publishTask(SubTask task) {
TaskMessage msg = new TaskMessage();
msg.setTaskId(task.getId());
msg.setPayload(task.getPayload());
msg.setCallbackTopic("agent-results");
kafka.send("agent-tasks", task.getServiceId(), msg);
}
}
// 结果订阅
@Service
public class ResultCollector {
private ConcurrentHashMap<String, CountDownLatch> latches = new ConcurrentHashMap<>();
@KafkaListener(topics = "agent-results")
public void onResult(TaskResult result) {
collectedResults.put(result.getTaskId(), result);
if (latches.containsKey(result.getTaskId())) {
latches.get(result.getTaskId()).countDown();
}
}
}
__`__INLINE_
3.3 共享状态与一致性
跨服务协作时,通过 Redis 实现共享上下文:
__`__INLINE_java
@Service
public class SharedContextStore {
@Autowired private StringRedisTemplate redis;
public void write(String key, Object value) {
redis.opsForValue().set("context:" + key, JSON.toJSONString(value));
redis.opsForValue().set("context:" + key + ":ts", System.currentTimeMillis());
}
public <T> T read(String key, Class<T> clazz) {
String json = redis.opsForValue().get("context:" + key);
return JSON.parseObject(json, clazz);
}
}
__`__INLINE_
四、实战案例:电商订单处理完整实现
4.1 业务场景
用户提交订单,系统自动完成:
1.库存预占(InventoryAgent)
2.优惠计算(CouponAgent)
3.支付扣款(PaymentAgent)
4.风控审核(RiskAgent)
5.订单创建(OrderAgent)
4.2 核心代码实现
__`__INLINE_java
@Service
@Slf4j
public class OrderProcessingService {
@Autowired private InventoryAgent inventoryAgent;
@Autowired private CouponAgent couponAgent;
@Autowired private PaymentAgent paymentAgent;
@Autowired private RiskAgent riskAgent;
@Autowired private OrderAgent orderAgent;
public OrderResult process(OrderRequest request) {
// 第一阶段:并行执行(无依赖)
CompletableFuture<InventoryResult> invFuture = CompletableFuture
.supplyAsync(() -> inventoryAgent.reserve(request.getItems()));
CompletableFuture<CouponResult> couponFuture = CompletableFuture
.supplyAsync(() -> couponAgent.calculate(request.getUserId(), request.getItems()));
CompletableFuture.allOf(invFuture, couponFuture).join();
InventoryResult invResult = invFuture.join();
CouponResult couponResult = couponFuture.join();
if (!invResult.isSuccess()) {
return OrderResult.fail("库存不足");
}
// 第二阶段:支付 + 风控(可并行)
BigDecimal finalAmount = couponResult.getFinalAmount();
CompletableFuture<PaymentResult> payFuture = CompletableFuture
.supplyAsync(() -> paymentAgent.deduct(request.getUserId(), finalAmount));
CompletableFuture<RiskResult> riskFuture = CompletableFuture
.supplyAsync(() -> riskAgent.evaluate(request));
CompletableFuture.allOf(payFuture, riskFuture).join();
PaymentResult payResult = payFuture.join();
RiskResult riskResult = riskFuture.join();
if (!payResult.isSuccess()) {
// 释放库存
inventoryAgent.release(request.getItems());
return OrderResult.fail("支付失败");
}
if (!riskResult.isApproved()) {
// 退款
paymentAgent.refund(request.getUserId(), finalAmount);
return OrderResult.fail("风控拦截");
}
// 第三阶段:创建订单
Order order = orderAgent.create(request, invResult, couponResult, payResult);
return OrderResult.success(order);
}
}
__`__INLINE_
4.3 线程池隔离
不同类型的 Agent 使用独立线程池,避免资源竞争:
__`__INLINE_java
@Bean
public TaskExecutor agentExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("agent-");
executor.initialize();
return executor;
}
@Bean
public TaskExecutor ioExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("io-");
executor.initialize();
return executor;
}
__`__INLINE_
4.4 性能对比
指标 | 纯串行 | 多 Agent 并行 |
总耗时 | ~3.5s | ~1.2s |
吞吐量提升 | 1x | ~3x |
失败隔离 | 无 | 支付失败可回滚库存 |
五、容错与可观测性
5.1 错误处理策略
__`__INLINE_java
public TaskResult executeWithRetry(SubTask task) {
int maxAttempts = 3;
for (int i = 0; i < maxAttempts; i++) {
try {
return executor.execute(task);
} catch (Exception e) {
log.warn("Task {} attempt {} failed: {}", task.getId(), i+1, e.getMessage());
if (i == maxAttempts - 1) {
// 最终失败,返回兜底结果
return TaskResult.fallback(task.getId(), getDefaultValue(task));
}
}
}
return TaskResult.fallback(task.getId(), getDefaultValue(task));
}
__`__INLINE_
5.2 监控埋点
__`__INLINE_java
@Aspect
@Component
public class AgentMetricsAspect {
@Around("execution(com.example.agent.Agent.execute(..))")
public Object measure(ProceedingJoinPoint pjp) throws Throwable {
String agentName = pjp.getTarget().getClass().getSimpleName();
long start = System.currentTimeMillis();
try {
Object result = pjp.proceed();
metrics.record(agentName, "success", System.currentTimeMillis() - start);
return result;
} catch (Exception e) {
metrics.record(agentName, "failure", System.currentTimeMillis() - start);
throw e;
}
}
}
__``
六、总结
多 Agent 高阶实战的核心在于:
1.架构分层:调度层专注编排,执行层专注业务,专家层提供深度能力
2.DAG 驱动:通过有向无环图管理任务依赖,确保正确执行顺序
3.跨服务协作:消息队列 + 共享存储实现服务间解耦与状态同步
4.容错兜底:重试机制 + 默认值策略确保系统稳定性
5.可观测:全链路埋点监控,快速定位问题
生产级多 Agent 系统不是一蹴而就,需要根据业务复杂度逐步演进。从简单场景入手,逐步引入分层架构、动态编排、服务协作,最终实现高效、可靠、可扩展的智能业务系统。
作者:洛水石
作者:洛水石
