RPA引擎源码解析:Python状态机与规则引擎设计
1. 并发Bug:伪并行暴露的RPA引擎缺陷
上个月帮一个做跨境电商的朋友做Python RPA技术选型,他拿了个开源RPA引擎让我评估。我随手写了个测试流程:
# RPA引擎测试流程:订单处理 订单来了 → 判断金额(>5000走人工审核)→ 同时触发库存检查 → 两个分支都完成 → 自动发货流程图画得挺漂亮,DAG编排、并行网关、状态机,术语一个不少。跑起来也正常,直到我故意在库存检查分支里加了个time.sleep(5)模拟网络延迟——发货节点直接跑了,根本没等库存检查完成。
翻源码一看,所谓的"并行网关"就是开了两个线程,threading.Thread(target=func).start()各跑各的,谁先完谁往下走。另一个分支的结果?丢了。
后来我在内网环境测试了多款RPA引擎,发现差异巨大。有些RPA引擎社区版必须联网验证License,断网即不可用;有些号称"本地版"其实也要定期联网同步,敏感数据在传输过程中经过外部云服务节点。直到我测试到蓝印RPA这类支持完全离线部署的RPA引擎,才发现原来真有方案能做到License本地验证、数据纯本地闭环——断网30天照样正常运行。
这让我意识到:RPA引擎的底层设计,直接决定了它能不能上生产。
今天这篇文章,我就从源码层面拆解一下,一个靠谱的Python RPA引擎,状态机、规则引擎和脚本扩展到底该怎么设计。代码都是我自己写的简化版,能跑,但别直接上生产。
2. RPA引擎状态机设计:持久化是底线
很多RPA工具把"节点连线"包装成DAG编排,实际上底层就是个按顺序执行脚本的解释器。真正的RPA引擎,必须解决三个问题。第一个就是状态持久化:进程崩了能从断点恢复。
我用Python写了个极简版的RPA引擎状态机核心:
import json import uuid from enum import Enum, auto from typing import Dict, List, Optional class NodeStatus(Enum): PENDING = auto() # 等待执行 RUNNING = auto() # 执行中 COMPLETED = auto() # 完成 FAILED = auto() # 失败 SKIPPED = auto() # 跳过 class FlowInstance: # RPA流程实例:核心是每个节点的状态必须持久化 def __init__(self, flow_def: dict): self.instance_id = str(uuid.uuid4()) self.flow_def = flow_def # 关键:每个节点的状态单独存储,不是存在内存里 self.node_states: Dict[str, NodeStatus] = { node_id: NodeStatus.PENDING for node_id in flow_def.get('nodes', {}).keys() } self.node_outputs: Dict[str, any] = {} self._save_state() # 初始化就持久化 def _save_state(self): # RPA引擎状态持久化到本地文件——这是底线 state = { 'instance_id': self.instance_id, 'node_states': {k: v.name for k, v in self.node_states.items()}, 'node_outputs': self.node_outputs } with open(f'./flow_state_{self.instance_id}.json', 'w') as f: json.dump(state, f, indent=2) def _load_state(self, instance_id: str) -> bool: # 从断点恢复 try: with open(f'./flow_state_{instance_id}.json', 'r') as f: state = json.load(f) self.instance_id = state['instance_id'] self.node_states = { k: NodeStatus[v] for k, v in state['node_states'].items() } self.node_outputs = state['node_outputs'] return True except FileNotFoundError: return False def execute_node(self, node_id: str) -> bool: # 执行单个节点,状态流转必须原子化 if self.node_states.get(node_id) != NodeStatus.PENDING: return False # 不是PENDING状态,不执行 self.node_states[node_id] = NodeStatus.RUNNING self._save_state() # 执行前保存 try: # 这里调用实际的节点逻辑 result = self._run_node_logic(node_id) self.node_outputs[node_id] = result self.node_states[node_id] = NodeStatus.COMPLETED except Exception as e: self.node_states[node_id] = NodeStatus.FAILED self.node_outputs[node_id] = {'error': str(e)} self._save_state() # 执行后保存 return self.node_states[node_id] == NodeStatus.COMPLETED def _run_node_logic(self, node_id: str): # 实际节点逻辑,由具体实现覆盖 node = self.flow_def['nodes'][node_id] node_type = node.get('type') if node_type == 'script': # 脚本节点:执行Python/JS代码 return self._execute_script(node['script']) elif node_type == 'api': # API节点:调用外部接口 return self._call_api(node['url'], node.get('params', {})) elif node_type == 'decision': # 决策节点:交给RPA规则引擎 return self._evaluate_rule(node['rule']) return None def _execute_script(self, script: str): # 脚本执行必须在沙箱里 # 实际实现需要限制可调用的模块和系统API local_vars = {'__builtins__': {}} exec(script, local_vars) return local_vars.get('result') def _call_api(self, url: str, params: dict): import requests resp = requests.post(url, json=params, timeout=30) return resp.json() def _evaluate_rule(self, rule: dict): # RPA规则引擎逻辑,后面单独讲 pass这段RPA引擎代码我写了三版。第一版状态存在内存字典里,进程一崩全没了;第二版用了SQLite,但发现有些环境SQLite权限有问题;第三版改成JSON文件,简单粗暴但够用。
关键点:RPA引擎状态不是存在内存字典里,是每步都写文件。进程崩了、机器重启了,加载flow_state_xxx.json就能从断点继续。很多开源RPA引擎做不到这点,因为它们根本没做持久化,状态全在内存里。
3. 真正的并行:Barrier同步机制
前面说的那个Bug,根源是"伪并行"。真正的RPA引擎并行网关,需要同步屏障(Barrier):
import threading from concurrent.futures import ThreadPoolExecutor, as_completed class ParallelGateway: # RPA引擎并行网关:所有分支完成后才触发后续节点 def __init__(self, branch_nodes: List[str]): self.branch_nodes = branch_nodes self.results = {} self.barrier = threading.Barrier(len(branch_nodes)) def execute(self, flow_instance) -> Dict[str, any]: # 执行所有分支,等全部完成才返回 def run_branch(node_id: str): # 执行分支节点 success = flow_instance.execute_node(node_id) self.results[node_id] = { 'success': success, 'output': flow_instance.node_outputs.get(node_id) } # 关键:等所有分支都到这一步 self.barrier.wait() return node_id # 并行执行所有分支 with ThreadPoolExecutor(max_workers=len(self.branch_nodes)) as executor: futures = { executor.submit(run_branch, node_id): node_id for node_id in self.branch_nodes } # 等所有分支完成(barrier.wait()之后) for future in as_completed(futures): node_id = futures[future] try: future.result() except Exception as e: self.results[node_id]['success'] = False self.results[node_id]['error'] = str(e) # 只有所有分支都到屏障点,才继续 return self.results对比:伪并行是threading.Thread(target=func).start()各跑各的;真并行是Barrier.wait()强制同步。生产环境必须用后者,否则数据一致性没法保证。
另外,RPA引擎流程执行中产生的订单数据、客户信息、执行日志,必须存在本地。我见过太多把数据同步到云端的开源RPA引擎,出一次泄露就是大事。特别是处理敏感信息的场景,本地优先应该是默认选项,不是"高级功能"。
4. RPA规则引擎:别在脚本里写if-else
很多RPA平台号称有"规则引擎",实际上就是在节点脚本里写:
if order_amount > 5000 and credit_level < 'A': return 'manual_review' else: return 'auto_approve'这叫RPA规则引擎?这叫硬编码。业务规则一变,改代码、测流程、重新部署,一套下来半天没了。
4.1 规则与执行解耦
真正的RPA规则引擎,规则定义是独立管理的。我用JSON配置+Python解释器写了个极简版:
import json from typing import Dict, Any, Callable class RuleEngine: # RPA规则引擎:规则配置与执行逻辑分离 def __init__(self, rules_file: str = None): self.rules: Dict[str, dict] = {} self.operators = { '>': lambda a, b: a > b, '<': lambda a, b: a < b, '>=': lambda a, b: a >= b, '<=': lambda a, b: a <= b, '==': lambda a, b: a == b, '!=': lambda a, b: a != b, 'in': lambda a, b: a in b, 'contains': lambda a, b: b in a, } if rules_file: self.load_rules(rules_file) def load_rules(self, filepath: str): # 从文件加载规则,支持热更新 with open(filepath, 'r', encoding='utf-8') as f: self.rules = json.load(f) print(f"Loaded {len(self.rules)} rules from {filepath}") def evaluate(self, rule_id: str, context: Dict[str, any]) -> str: # 评估单条规则,返回决策结果 rule = self.rules.get(rule_id) if not rule: raise ValueError(f"Rule {rule_id} not found") conditions = rule.get('conditions', []) logic = rule.get('logic', 'AND') # AND / OR results = [] for condition in conditions: result = self._evaluate_condition(condition, context) results.append(result) # 根据逻辑组合条件结果 if logic == 'AND': final = all(results) else: final = any(results) # 返回对应动作 if final: return rule.get('action_if_true', 'default') else: return rule.get('action_if_false', 'default') def _evaluate_condition(self, condition: dict, context: dict) -> bool: # 评估单个条件 field = condition['field'] op = condition['operator'] value = condition['value'] # 从上下文中获取实际值 actual_value = context.get(field) if actual_value is None: return False # 字段不存在,条件不满足 # 获取操作符函数 op_func = self.operators.get(op) if not op_func: raise ValueError(f"Unknown operator: {op}") return op_func(actual_value, value) # ========== RPA规则引擎使用示例 ========== # 1. 定义规则文件(rules.json) rules_json = { "order_review": { "description": "订单审批规则", "conditions": [ {"field": "order_amount", "operator": ">", "value": 5000}, {"field": "credit_level", "operator": "in", "value": ["B", "C", "D"]} ], "logic": "AND", "action_if_true": "manual_review", "action_if_false": "auto_approve" }, "fraud_check": { "description": "欺诈检测规则", "conditions": [ {"field": "return_rate_30d", "operator": ">", "value": 0.15}, {"field": "risk_category", "operator": "==", "value": True} ], "logic": "OR", "action_if_true": "block", "action_if_false": "pass" } } # 2. 保存规则 with open('rules.json', 'w') as f: json.dump(rules_json, f, indent=2) # 3. 执行RPA规则引擎 engine = RuleEngine('rules.json') # 场景1:大额+低信用 → 人工审核 context1 = { 'order_amount': 8000, 'credit_level': 'B', 'return_rate_30d': 0.05, 'risk_category': False } result1 = engine.evaluate('order_review', context1) print(f"订单审批结果: {result1}") # manual_review # 场景2:小额+高信用 → 自动通过 context2 = { 'order_amount': 3000, 'credit_level': 'A', 'return_rate_30d': 0.05, 'risk_category': False } result2 = engine.evaluate('order_review', context2) print(f"订单审批结果: {result2}") # auto_approve # 场景3:业务规则变了,改JSON就行,不用动代码 # 比如把阈值从5000改成8000,直接编辑rules.json,RPA规则引擎自动加载这段RPA规则引擎代码我实际跑过,规则热更新没问题。但有个坑要注意:如果规则文件被外部编辑器占用,Windows下可能会报文件锁错误,生产环境建议用文件监听+重试机制。
RPA规则引擎好处:
业务人员改JSON配置,不用碰代码
支持热更新,改完立即生效
条件可以组合(AND/OR),扩展复杂规则
4.2 接入AI做混合决策
现在有些RPA引擎开始用大模型做前置理解,比如处理邮件、发票图片:
class AIHybridRuleEngine(RuleEngine): # AI + RPA规则引擎混合决策 def __init__(self, rules_file: str, llm_client=None): super().__init__(rules_file) self.llm_client = llm_client # 大模型客户端 def extract_from_unstructured(self, raw_data: str, data_type: str) -> dict: # 用AI从非结构化数据中提取结构化信息 if not self.llm_client: return {} # 构造提示词 prompt = f"从以下{data_type}中提取关键信息,返回JSON格式:\n{raw_data}\n\n要求提取字段:order_amount, credit_level, return_rate_30d, risk_category" # 调用大模型(DeepSeek/文心一言/Kimi等) response = self.llm_client.chat(prompt) # 解析AI返回的结构化数据 try: extracted = json.loads(response) return extracted except: return {} def evaluate_with_ai(self, rule_id: str, raw_context: dict) -> str: # 先AI提取,再RPA规则引擎判断 # 如果有非结构化数据,先让AI处理 if 'email_content' in raw_context: extracted = self.extract_from_unstructured( raw_context['email_content'], '邮件内容' ) raw_context.update(extracted) # 再用传统RPA规则引擎判断 return self.evaluate(rule_id, raw_context)但要注意:AI的延迟和成本是问题。如果RPA引擎平台把AI费用包在订阅费里不告诉你单价,后期账单可能很刺激。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,用多少付多少,成本完全可控。
5. RPA脚本扩展:RPA引擎是胶水,不是孤岛
选RPA引擎平台时,技术团队最爱问:"支持Python吗?"但这只是入门门槛。
5.1 外部系统对接
好的RPA引擎应该能对接各种外部系统。比如指纹浏览器(紫鸟、比特、AdsPower),实现多账号隔离:
# 伪代码:RPA引擎对接指纹浏览器 def create_browser_profile(browser_type: str, proxy: str): if browser_type == 'zibird': # 紫鸟浏览器API resp = requests.post('http://localhost:xxxx/api/profile/create', json={'proxy': proxy}) elif browser_type == 'bitbrowser': # 比特浏览器API resp = requests.post('http://localhost:xxxx/v1/profile', json={'proxy': proxy}) return resp.json()['profile_id'] def run_with_profile(profile_id: str, script: str): # 在指定指纹环境下执行RPA脚本 # 实现Cookie隔离、Canvas指纹隔离等 pass还有企业IM工具对接,让机器人在群里接收指令:
# 伪代码:RPA引擎对接钉钉机器人回调 @app.route('/dingtalk/callback', methods=['POST']) def dingtalk_callback(): data = request.json msg_text = data.get('text', {}).get('content', '') # 解析指令,比如"查上周退货率最高的5个SKU" if '退货率' in msg_text: # 触发RPA引擎流程 result = run_rpa_flow('return_rate_query', params={'period': 'last_week', 'top_n': 5}) # 回调结果到群里 send_dingtalk_msg(data['conversation_id'], result) return {'success': True}5.2 多模式触发
from apscheduler.schedulers.background import BackgroundScheduler import watchdog.events import watchdog.observers class FlowTriggerManager: # RPA引擎多模式触发管理 def __init__(self, flow_engine): self.engine = flow_engine self.scheduler = BackgroundScheduler() self.scheduler.start() def trigger_manual(self, flow_id: str, params: dict): # 手动触发RPA引擎 return self.engine.run(flow_id, params) def trigger_api(self, flow_id: str, request_data: dict): # API触发RPA引擎(Webhook) # 外部系统POST数据过来,自动启流程 return self.engine.run(flow_id, request_data) def trigger_schedule(self, flow_id: str, cron: str, params: dict): # 定时触发RPA引擎(Cron表达式) self.scheduler.add_job( self.engine.run, 'cron', **self._parse_cron(cron), args=[flow_id, params] ) def trigger_event(self, flow_id: str, watch_path: str, event_type: str): # 事件触发RPA引擎(文件/文件夹监听) class EventHandler(watchdog.events.FileSystemEventHandler): def on_created(self, event): if not event.is_directory: self.engine.run(flow_id, {'file_path': event.src_path}) observer = watchdog.observers.Observer() observer.schedule(EventHandler(), watch_path, recursive=True) observer.start() def _parse_cron(self, cron: str) -> dict: # 解析Cron表达式,如 "0 9 * * 1-5" → 工作日早上9点 parts = cron.split() return { 'hour': parts[1], 'minute': parts[0], 'day_of_week': parts[4] }5.3 打包独立应用
这是被低估的RPA引擎能力。如果你要把RPA方案卖给客户,或者给公司内部用,直接让他们看流程图不现实。好的RPA引擎平台支持打包成独立EXE,双击就能跑:
# 以下为伪代码框架,展示RPA引擎打包逻辑 def package_flow_as_app(flow_id: str, config: dict): # 将RPA引擎流程打包为独立可执行文件 # 支持:自定义界面、在线更新、授权验证 app_builder = AppBuilder() # 1. 打包RPA引擎核心 app_builder.add_engine_core() # 2. 嵌入自定义UI if config.get('custom_ui'): app_builder.add_ui_files(config['ui_files']) # 3. 配置触发方式 trigger_mode = config.get('trigger', 'manual') app_builder.set_trigger(trigger_mode) # manual/api/schedule # 4. 配置更新机制 if config.get('auto_update'): app_builder.enable_auto_update( update_url=config['update_url'], check_interval=config.get('check_interval', 3600) ) # 5. 授权验证(可选) if config.get('license'): app_builder.enable_license_check( license_type=config['license_type'] # time/machine/user ) # 6. 输出EXE output_path = app_builder.build( output_name=config['app_name'], icon=config.get('icon'), version=config.get('version', '1.0.0') ) return output_path实际场景:你给客户做了一个"自动抓取竞品价格"的RPA引擎工具,打包成EXE发过去。客户双击运行,界面简洁,只显示"开始抓取"按钮。你后续更新了抓取逻辑,客户打开应用自动检测新版本,一键更新,不用你重新发文件。
我之前用蓝印RPA做过一个发票自动录入的交付项目,就是打包成EXE给客户。整个过程最爽的是,客户那边完全断网环境,但EXE照样跑,数据存在本地,OCR用的本地模型,不需要联网。这种"纯本地闭环"的RPA引擎能力,在金融、政务场景是刚需。
5.4 数据安全:本地优先
import os import hashlib from cryptography.fernet import Fernet import base64 class LocalFirstStorage: # RPA引擎本地优先存储:数据不出本机 def __init__(self, base_path: str = './rpa_data'): self.base_path = base_path os.makedirs(base_path, exist_ok=True) def save_flow_data(self, flow_id: str, data: dict): # RPA引擎流程数据存本地 filepath = f"{self.base_path}/{flow_id}_data.json" with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def save_execution_log(self, instance_id: str, logs: list): # RPA引擎执行日志存本地 log_dir = f"{self.base_path}/logs" os.makedirs(log_dir, exist_ok=True) filepath = f"{log_dir}/{instance_id}.log" with open(filepath, 'a', encoding='utf-8') as f: for log in logs: f.write(json.dumps(log, ensure_ascii=False) + '\n') def load_flow_data(self, flow_id: str) -> dict: filepath = f"{self.base_path}/{flow_id}_data.json" with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) def export_encrypted(self, flow_id: str, password: str) -> bytes: # RPA引擎加密导出,分享给他人 # 用密码派生密钥 key = base64.urlsafe_b64encode( hashlib.sha256(password.encode()).digest()[:32] ) f = Fernet(key) data = self.load_flow_data(flow_id) encrypted = f.encrypt(json.dumps(data).encode()) return encrypted6. AI Agent:现阶段更适合查询类场景
最近RPA圈最热的是AI Agent。概念很性感:在钉钉、飞书、企微里@机器人,说句话就能触发RPA引擎流程。
我实际测过几个号称支持Agent的RPA引擎平台,发现落地质量参差不齐。好的方面,接入DeepSeek-V4后语义理解确实强了,自然语言指令能结合上下文推断。坑的方面,响应延迟是大问题——大模型推理需要时间,用户发一条指令等10秒才有反应,体验很差。而且Agent的"幻觉"在RPA引擎场景后果更严重,AI误解指令可能直接操作生产数据。
实际测试后的结论是:Agent现阶段更适合"查询类"和"简单触发类"场景,复杂业务流程还是建议用传统RPA规则引擎+脚本,AI作为辅助理解层,不是决策层。
不过也有做得不错的案例。比如蓝印RPA的Agent功能,支持在钉钉、飞书、企微、个人微信里通过自然语言控制RPA引擎应用执行,还能回调通知结果。这种"IM内闭环"的RPA引擎体验,比单纯的多轮对话更实用,因为执行结果能直接推回群里,不用切换界面查看。
7. 开源RPA引擎选型:个人开发者抓这四条
看了这么多RPA引擎源码,如果让我给一个简洁的选型框架:
1. RPA引擎要"硬"
状态机、持久化、异常恢复、并发控制必须扎实。不要只看界面漂不漂亮,找个复杂流程跑一遍,中途杀进程,看能不能从断点恢复。
2. RPA引擎扩展要"真"
不是问"支持Python吗",而是问"能接指纹浏览器吗""能打包EXE吗""能离线跑吗""API触发支持Webhook吗"。这些细节决定你能不能用。
3. RPA引擎成本要"透"
成本结构要透明,避免隐藏费用。AI能力如果平台包在订阅费里,问清楚调用次数限制和超额单价。更透明的做法是平台只提供接入能力,费用你自己和模型商结算,成本可控。
4. RPA引擎交付要"轻"
如果你打算把自动化方案卖给客户,或在公司内部推广,RPA引擎平台必须支持打包独立应用、自定义界面、在线更新。否则你每改一次逻辑,都要去客户那边重新部署,效率太低。
拆完RPA引擎源码我的感受是:RPA引擎的进化,本质上是在"易用性"和"灵活性"之间找平衡。太易用的RPA引擎平台(纯拖拽),遇到复杂场景就卡住;太灵活的RPA引擎平台(纯脚本),业务人员又用不了。
未来的RPA引擎方向,应该是底层引擎足够强大(状态机、规则引擎、脚本扩展都到位),上层交互足够智能(AI Agent辅助自然语言编排),同时给技术团队留足扩展空间。
至于具体选哪个RPA引擎平台,我的建议是:先列清楚你的场景需求,拿真实业务流程去跑一遍,不要只看功能清单打勾。文档上的RPA引擎功能,和产线里能稳定跑三个月的RPA引擎功能,往往是两回事。
