当前位置: 首页 > news >正文

实战 LangGraph 循环执行:构建带自动重试的并行任务流

核心场景与需求拆解

我们的目标场景很明确:

  1. 并行执行两个独立任务(获取天气、获取时间),两个任务都存在随机失败概率;
  2. 只要任意一个任务失败,就自动触发循环重试,无需手动重启流程;
  3. 支持设置最大重试次数,避免无限循环,保障流程稳定性;
  4. 全程通过状态管理数据,重试时复用原始输入,自动更新执行结果。

这种模式广泛适用于:第三方 API 重试、异步数据拉取、批量任务容错处理、AI 工具调用容错等实际业务场景,通用性极强。

二、LangGraph 循环执行的核心设计思路

LangGraph 实现循环的核心逻辑,完全围绕 **「状态管理 + 条件决策」** 展开,没有复杂的语法,核心分为三大模块:

1. 状态定义:循环的「数据载体」

首先定义专属的工作流状态,把输入数据、任务结果、重试次数、最大重试数、执行状态全部封装起来。状态是 LangGraph 循环的核心 —— 所有节点的执行、重试的判断、结果的更新,都基于这个状态流转,每一次循环都会自动更新重试次数和任务结果,为决策提供依据。

2. 执行节点:循环的「任务核心」

这是循环的主体节点,负责并行执行目标任务。我们通过线程池实现多任务并行执行,提升效率;执行完成后,会自动判断两个任务是否全部成功,并把结果、重试次数更新到状态中,传递给下一个节点。这个节点是循环的「动作执行者」,每一次重试都会重新执行这个节点。

3. 决策节点:循环的「开关控制器」

这是实现循环的关键核心!决策节点会读取最新的状态,根据规则判断流程走向:

  • ✅ 任务全部成功 → 结束流程;
  • ❌ 任务失败 + 未达最大重试次数 →回到执行节点,重新循环
  • ❌ 任务失败 + 达到最大重试次数 → 强制结束流程。

通过这个决策节点,LangGraph 轻松实现了闭环循环,这也是它比传统工作流框架更灵活的地方。

三、LangGraph 循环执行的核心优势

对比传统代码写循环 + 重试的方式,LangGraph 的循环执行有三大突出优势:

1. 逻辑解耦,可读性拉满

循环逻辑、任务执行逻辑、决策逻辑完全分离,每个节点只做一件事。后续修改重试次数、调整任务逻辑、新增失败处理,都不用改动核心循环代码,维护成本极低。

2. 状态自动流转,无需手动管理

不用自己定义变量记录重试次数、不用手动传递任务结果,LangGraph 会自动管理状态的更新与传递,彻底避免手动管理状态带来的 bug。

3. 容错性强,适配不稳定场景

针对第三方接口、网络请求等易失败场景,自动重试机制能大幅提升任务成功率;同时限制最大重试次数,兼顾效率与稳定性,生产环境直接可用。

4. 并行 + 循环完美结合

支持在循环节点内执行并行任务,既解决了任务容错问题,又保证了执行效率,复杂工作流也能轻松编排。

四、落地价值与适用场景

这套 LangGraph 循环重试模式,是生产级工作流的必备能力,适用场景非常广泛:

  • 第三方 API 调用容错(天气、地图、支付等接口);
  • AI Agent 工具调用自动重试(LLM 接口、插件调用);
  • 批量数据拉取 / 处理,保障任务完整性;
  • 异步任务执行,失败自动恢复。

它让工作流从「一次性执行」变成「智能容错执行」,大幅提升系统的稳定性和健壮性。

五、总结

LangGraph 的循环执行能力,通过状态驱动 + 条件决策的设计,让复杂的循环重试工作流变得极简、优雅。

我们不需要编写嵌套的循环代码,只需要:定义状态承载数据→编写执行节点处理任务→编写决策节点控制循环,就能快速构建出支持并行、自动重试、容错性强的工作流。

这也是 LangGraph 的核心魅力:用最简洁的编排方式,解决最复杂的工作流问题,无论是 AI 应用还是常规业务流程,都能轻松驾驭。


代码流程图:

实现代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

fromlanggraph.graphimportStateGraph, END

fromtypingimportTypedDict, Annotated

importoperator

importtime

importrandom

fromconcurrent.futuresimportThreadPoolExecutor, as_completed

# 1. 定义带重试状态的 State

classLoopState(TypedDict):

input_data:str# 输入数据

weather_data:str|None# 天气数据

time_data:str|None# 时间数据

retry_count:int# 重试次数

max_retries:int# 最大重试次数

is_success:bool# 是否执行成功(用于判断循环退出)

# 2. 模拟不稳定的并行任务(可能失败,触发循环)

deffetch_weather(input_data:str)->str:

"""模拟天气数据获取(30%概率失败)"""

print(f"\n[天气任务] 处理: {input_data}")

ifrandom.random() <0.3:# 30%失败概率

print("[天气任务] ❌ 数据获取失败")

return""

time.sleep(1)# 模拟耗时

result=f"{input_data} 天气:25℃ 晴天"

print(f"[天气任务] ✅ 成功: {result}")

returnresult

deffetch_time(input_data:str)->str:

"""模拟时间数据获取(20%概率失败)"""

print(f"\n[时间任务] 处理: {input_data}")

ifrandom.random() <0.2:# 20%失败概率

print("[时间任务] ❌ 数据获取失败")

return""

time.sleep(1)# 模拟耗时

result=f"{input_data} 时间:15:30 下午"

print(f"[时间任务] ✅ 成功: {result}")

returnresult

# 3. 并行执行节点(循环的核心执行节点)

defparallel_execution(state: LoopState)-> LoopState:

"""并行执行天气/时间任务,返回更新后的状态"""

input_data=state["input_data"]

retry_count=state["retry_count"]+1

print(f"\n{'='*50}")

print(f"[循环执行] 第 {retry_count} 次重试")

print('='*50)

# 线程池并行执行

with ThreadPoolExecutor(max_workers=2) as executor:

future_weather=executor.submit(fetch_weather, input_data)

future_time=executor.submit(fetch_time, input_data)

# 收集结果

weather_data=""

time_data=""

forfutureinas_completed([future_weather, future_time]):

iffuture==future_weather:

weather_data=future.result()

eliffuture==future_time:

time_data=future.result()

# 判断本次执行是否成功

is_success=bool(weather_dataandtime_data)

# 返回更新后的状态

return{

"input_data": input_data,

"weather_data": weather_data,

"time_data": time_data,

"retry_count": retry_count,

"max_retries": state["max_retries"],

"is_success": is_success

}

# 4. 循环决策节点(判断是否继续重试)

defloop_decision(state: LoopState)->str:

"""

决策节点:返回下一个节点名称(循环/结束)

- 成功 → END

- 失败且未达最大重试 → parallel_execution(继续循环)

- 失败且达最大重试 → END

"""

print(f"\n[决策节点] 重试次数: {state['retry_count']}/{state['max_retries']} | 执行成功: {state['is_success']}")

# 退出条件1:执行成功

ifstate["is_success"]:

print("[决策节点] ✅ 所有任务成功,结束循环")

returnEND

# 退出条件2:达到最大重试次数

elifstate["retry_count"] >=state["max_retries"]:

print("[决策节点] ❌ 达到最大重试次数,强制结束")

returnEND

# 继续循环:返回并行执行节点名称

else:

print("[决策节点] 🔄 任务失败,继续重试")

return"parallel_execution"

# 5. 初始化节点(设置初始状态)

definit_state(state: LoopState)-> LoopState:

"""初始化循环状态"""

return{

"input_data": state["input_data"],

"weather_data":None,

"time_data":None,

"retry_count":0,# 初始重试次数0

"max_retries": state["max_retries"],# 最大重试次数(外部传入)

"is_success":False# 初始未成功

}

http://www.jsqmd.com/news/1075222/

相关文章:

  • 100VIN,0.2A,耐高压LDO,XZ6203H
  • 教你如何将yolov8训练好的文件部署在RDK上
  • 解锁无损音乐宝藏:TIDAL Downloader Next Generation 让你的音乐收藏焕然一新![特殊字符]
  • Java 面试复习草稿:HashMap 与线程池
  • 在项目中使用了Nutz框架,能说一下它相比MyBatis的优势和不足吗?你们为什么选它?
  • 从零学习Kafka:生产者分区机制
  • 面试官问:“你怎么评估一个 Agent 到底好不好用?”,我笑了:“试了几个问题,没问题就行”,面试官:“你不叫评估,叫碰运气”
  • LSTM序列分类实战:门控机制、双向设计与工程调优指南
  • 终极指南:如何用DroneSecurity深度解析DJI无人机通信协议?
  • 《HarmonyOS技术精讲-UI开发 (基于NDK构建UI)》第4篇:高效Canvas绘制——NDK中的2D渲染加速
  • 一升主机跑百亿大模型:酷睿Ultra端侧AI实战指南
  • 磁盘空间告急?这个Rust工具帮你找出所有可以删的文件
  • 分钟看懂p值和置信区间:别再被_显著_忽悠了
  • 九大网盘直链下载助手完整指南:免费高速下载终极方案
  • MPC8360E内存控制器深度解析:SDRAM时序与UPM可编程接口实战
  • Bootstrap Tooltip XSS漏洞复现:从原理到防御的深度解析
  • 临床AI落地五大生死线:从模型可信度到人机协同的实战指南
  • hcip二层综合实验
  • LinkSwift终极指南:如何优雅获取九大网盘直链下载地址
  • Ghostty + Fish + Starship + fzf + zoxide + Raycast
  • UEditor远程文件抓取漏洞解析:从原理到修复的Web安全实战
  • 赛博朋克2077存档编辑器:彻底掌控夜之城的终极工具
  • AI领域每日资讯报告(2026年6月24日)
  • AI科研画图
  • Mac上使用VScode优雅开发STM32
  • LED光学测量对产品的品质重要性
  • TFRecord写入最佳实践:从数据序列化到生产级稳定性
  • CountDownLatch
  • Kubernetes RBAC 实战指南
  • Cloudflare 发起回源连接断开,连不上 443 端口的原因