当前位置: 首页 > news >正文

100行代码手搓Agent框架!小白也能看懂的核心代码细节!

【PocketFlow】

先上核心代码!

importasyncio,warnings,copy,timeclassBaseNode:def__init__(self):self.params,self.successors={},{}defset_params(self,params):self.params=paramsdefnext(self,node,action="default"):ifactioninself.successors:warnings.warn(f"Overwriting successor for action '{action}'")self.successors[action]=node;returnnodedefprep(self,shared):passdefexec(self,prep_res):passdefpost(self,shared,prep_res,exec_res):passdef_exec(self,prep_res):returnself.exec(prep_res)def_run(self,shared):p=self.prep(shared);e=self._exec(p);returnself.post(shared,p,e)defrun(self,shared):ifself.successors:warnings.warn("Node won't run successors. Use Flow.")returnself._run(shared)def__rshift__(self,other):returnself.next(other)def__sub__(self,action):ifisinstance(action,str):return_ConditionalTransition(self,action)raiseTypeError("Action must be a string")class_ConditionalTransition:def__init__(self,src,action):self.src,self.action=src,actiondef__rshift__(self,tgt):returnself.src.next(tgt,self.action)classNode(BaseNode):def__init__(self,max_retries=1,wait=0):super().__init__();self.max_retries,self.wait=max_retries,waitdefexec_fallback(self,prep_res,exc):raiseexcdef_exec(self,prep_res):forself.cur_retryinrange(self.max_retries):try:returnself.exec(prep_res)exceptExceptionase:ifself.cur_retry==self.max_retries-1:returnself.exec_fallback(prep_res,e)ifself.wait>0:time.sleep(self.wait)classBatchNode(Node):def_exec(self,items):return[super(BatchNode,self)._exec(i)foriin(itemsor[])]classFlow(BaseNode):def__init__(self,start=None):super().__init__();self.start_node=startdefstart(self,start):self.start_node=start;returnstartdefget_next_node(self,curr,action):nxt=curr.successors.get(actionor"default")ifnotnxtandcurr.successors:warnings.warn(f"Flow ends: '{action}' not found in{list(curr.successors)}")returnnxtdef_orch(self,shared,params=None):curr,p,last_action=copy.copy(self.start_node),(paramsor{**self.params}),Nonewhilecurr:curr.set_params(p);last_action=curr._run(shared);curr=copy.copy(self.get_next_node(curr,last_action))returnlast_actiondef_run(self,shared):p=self.prep(shared);o=self._orch(shared);returnself.post(shared,p,o)defpost(self,shared,prep_res,exec_res):returnexec_resclassBatchFlow(Flow):def_run(self,shared):pr=self.prep(shared)or[]forbpinpr:self._orch(shared,{**self.params,**bp})returnself.post(shared,pr,None)classAsyncNode(Node):asyncdefprep_async(self,shared):passasyncdefexec_async(self,prep_res):passasyncdefexec_fallback_async(self,prep_res,exc):raiseexcasyncdefpost_async(self,shared,prep_res,exec_res):passasyncdef_exec(self,prep_res):forself.cur_retryinrange(self.max_retries):try:returnawaitself.exec_async(prep_res)exceptExceptionase:ifself.cur_retry==self.max_retries-1:returnawaitself.exec_fallback_async(prep_res,e)ifself.wait>0:awaitasyncio.sleep(self.wait)asyncdefrun_async(self,shared):ifself.successors:warnings.warn("Node won't run successors. Use AsyncFlow.")returnawaitself._run_async(shared)asyncdef_run_async(self,shared):p=awaitself.prep_async(shared);e=awaitself._exec(p);returnawaitself.post_async(shared,p,e)def_run(self,shared):raiseRuntimeError("Use run_async.")classAsyncBatchNode(AsyncNode,BatchNode):asyncdef_exec(self,items):return[awaitsuper(AsyncBatchNode,self)._exec(i)foriinitems]classAsyncParallelBatchNode(AsyncNode,BatchNode):asyncdef_exec(self,items):returnawaitasyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i)foriinitems))classAsyncFlow(Flow,AsyncNode):asyncdef_orch_async(self,shared,params=None):curr,p,last_action=copy.copy(self.start_node),(paramsor{**self.params}),Nonewhilecurr:curr.set_params(p);last_action=awaitcurr._run_async(shared)ifisinstance(curr,AsyncNode)elsecurr._run(shared);curr=copy.copy(self.get_next_node(curr,last_action))returnlast_actionasyncdef_run_async(self,shared):p=awaitself.prep_async(shared);o=awaitself._orch_async(shared);returnawaitself.post_async(shared,p,o)asyncdefpost_async(self,shared,prep_res,exec_res):returnexec_resclassAsyncBatchFlow(AsyncFlow,BatchFlow):asyncdef_run_async(self,shared):pr=awaitself.prep_async(shared)or[]forbpinpr:awaitself._orch_async(shared,{**self.params,**bp})returnawaitself.post_async(shared,pr,None)classAsyncParallelBatchFlow(AsyncFlow,BatchFlow):asyncdef_run_async(self,shared):pr=awaitself.prep_async(shared)or[]awaitasyncio.gather(*(self._orch_async(shared,{**self.params,**bp})forbpinpr))returnawaitself.post_async(shared,pr,None)

核心代码一句话总概括

这段代码 =一个造工作流的轮子
你可以把它理解成:流水线搭建工具

  • 你写一个个小任务(节点)
  • >>把它们串起来
  • 框架自动帮你按顺序执行、重试、批量处理、异步并行

它就是一个流程自动化引擎


看懂 3 个核心概念(看懂就全懂了)

1. 节点 Node = 一个任务

比如:

  • 查数据库
  • 发请求
  • 算个数
  • 读文件

一个节点 = 干一件事。

2. 流程 Flow = 一串节点

节点1执行完 → 自动执行节点2 → 再执行节点3

3. 执行生命周期(所有节点都一样)

所有节点都固定分 3 步跑:

  1. prep:准备(比如拿参数)
  2. exec:真正干活(核心代码)
  3. post:收尾(比如存结果)

逐行超通俗解释

1.BaseNode基类(所有节点的祖宗)

classBaseNode:def__init__(self):self.params={}# 节点参数self.successors={}# 下一个要执行谁defnext(self,node):把当前节点 → 指向下一个节点 比如 A.next(B)就是 A 执行完跑 Bdefprep(self):准备defexec(self):干活defpost(self):收尾defrun(self):执行 prep →exec→ post

作用:定义所有节点必须有的结构。


2.Node普通同步节点(最常用)

classNode(BaseNode):def__init__(self,max_retries=1,wait=0):最大重试次数 重试等待时间

自带失败重试!
失败了会自动重试 N 次。


3.BatchNode批量节点

给它一个列表,它会把列表里每一条都执行一遍。


4.Flow流程(最关键!)

classFlow(BaseNode):它的作用:自动跑一串节点 从起点开始 → 跑完一个自动跑下一个 → 直到结束

比如:

A >> B >> C

Flow 会自动跑:
A → B → C


5. 条件跳转(超级好用)

A-"success">>B A-"fail">>C

意思:

  • A 返回 “success” → 跑 B
  • A 返回 “fail” → 跑 C

这就是流程分支判断!


异步部分(超简单理解)

所有带Async的类 =异步版本

  • 同步:一件事做完再做下一件
  • 异步:多件事同时跑(不等待)

类名规律:

  • AsyncNode:异步节点
  • AsyncFlow:异步流程
  • AsyncBatchNode:异步批量
  • AsyncParallelBatchNode异步并行(同时跑一堆任务)

最关键的执行逻辑(Flow 怎么跑?)

1. 从 start 节点开始 2. 执行节点 3. 看节点返回什么 4. 跳转到对应的下一个节点 5. 直到没有下一个节点,结束

完全自动化,不用你写循环和判断!


用一个超级小例子

# 定义节点1classA(Node):defexec(self,x):print("我是A")return"next"# 定义节点2classB(Node):defexec(self,x):print("我是B")# 搭建流程flow=Flow()a=A()b=B()a-"next">>b# A 返回 next 就执行 Bflow.start(a)# 运行flow.run({})

输出:

我是A 我是B

框架自动执行!


这个框架到底能干嘛?

你可以用它做:

  • 自动化流程
  • 数据处理管道
  • 接口自动化
  • 批量任务
  • 异步并发任务
  • 带重试、带分支的工作流

最简总结

这段代码就是:

一个轻量级流程编排引擎

  • 节点 = 任务
  • 流程 = 任务连线
  • >>= 执行顺序
  • 支持:重试、批量、异步、分支判断

你只需要写任务,框架帮你调度执行。


基于核心代码可根据需求拓展多智能体、agent-skill等等多方能力。

http://www.jsqmd.com/news/714245/

相关文章:

  • 30秒学会AI视频插帧:Flowframes让你的视频秒变120帧超流畅
  • 从论文到可运行代码:我如何把ConvLSTM-UNet车道线检测模型“跑”起来(附完整PyTorch项目)
  • 大学生建议-做事情-抠细节是永远赚不到钱的
  • -大家家里都没有托底-所以不要折腾-
  • 大气层系统终极指南:3步快速上手Switch自制系统完整教程
  • 01导论——《大数据平台架构(主编:吕欣 黄宏斌)》读书笔记2
  • 打工和赚钱的断层5-赚钱需要的沉淀和积累远远要比打工多
  • 【实战指南】开源字体革命:零成本生成专业条码的完整方案
  • vCenter证书过期导致Web服务挂掉?手把手教你用certificate-manager重置(附清理备份脚本)
  • 大家千万不要无脑讨价还价-机会往往只有一次
  • 大学生-研究生毕业找工作思路整理
  • 抖音获客:流量密码背后的真实与挑战 - 年度推荐企业名录
  • XposedRimetHelper技术解构:系统级定位拦截与时空控制机制分析
  • 打工和赚钱的断层6-打工永远盯着短期利益-赚钱则要明白轻重缓急
  • 你的App连不上WiFi?可能是Android 10的隐私权限在搞鬼(附排查指南)
  • 手把手用CubeMX+MDK给STM32H743/F407搭建RTX5项目(附工程模板)
  • 大家去现实世界见见活人吧-别再不停的电子鸦片了
  • 大学生专辑-看清那些花里胡哨的-只关心本质就好了
  • 新手必看:2026年腾讯企业邮箱购买方式全流程解析 - 品牌2025
  • ImageStrike技术深度解析:CTF图像隐写分析的多模态架构实现
  • 2026年大理石异形平台厂家推荐:泊头市华博工量具,大理石打孔平台/大理石检验平台/大理石00级平台厂家 - 品牌推荐官
  • YOLOv5模型魔改实战:插入SE模块后,我的检测精度提升了多少?(附消融实验对比)
  • AI沈阳工具谁家最好服务?星闪Ai智能体避坑指南,教你选对工具少走弯路
  • 打工和赚钱的断层7-一个是寻求0到1-一个是追求性价比和安全
  • 大家日常经常用到的画饼和讲故事技巧
  • 抖音获客:流量密码背后的真实挑战 - 年度推荐企业名录
  • 另类文件备份方法
  • 2026 四款 AI:代码质量与生成速度比拼
  • 打工和赚钱的断层8-一个靠别人喂到嘴里-一个靠发自内心的驱动
  • #2026最新公司注册公司推荐!南昌优质权威榜单发布,专业靠谱南昌等地公司服务可信赖 - 十大品牌榜