影刀RPA店群自动化架构:多节点执行机自动注册与服务发现实战
影刀RPA店群自动化架构:多节点执行机自动注册与服务发现实战
手动往集群里加一台执行机,要改五个配置文件、重启三个服务。
拼多多店群自动化报活动上架!
这种运维方式,在机器数量超过五台之后就彻底失控了。
店群自动化系统跑了大半年,Worker节点从2台扩到8台,再扩到15台。
每次扩容,运维同事都得手动修改Master的节点列表,更新Nginx转发规则,再逐台重启Worker上的代理服务。
有一次半夜服务器宕机,紧急启用了备用机器,结果因为IP没加到白名单里,新节点一直注册不上。
等到人工介入处理完,已经是凌晨三点。
那次之后我们决定:执行节点的加入和退出,必须做到像云服务一样自动、无感。
这篇文章就完整展开这套基于Redis的服务注册与发现机制的工程实践。
TEMU店群矩阵自动化运营核价报活动
一、从静态配置到动态注册
原来我们的节点管理方式很简单:一个YAML配置文件,列出所有Worker的IP和端口。
workers:-id:node01-ip:192.168.1.101-port:50051-max_browsers:6--id:node02-ip:192.168.1.102-port:50051-max_browsers:6-``` Master启动时读取这个文件,建立Worker连接池。 这种方式在节点数量固定时没问题,但一旦需要弹性伸缩,问题就来了:新增机器要改文件、提交Git、通知Master重载。**真正的服务发现,应该是Worker主动宣告自己的存在,Master动态感知集群拓扑。**我们选用了Redis作为注册中心。 每个Worker启动时,将自己的元数据写入Redis Hash,并定期续约心跳。 Master通过监听Redis键变化,实时维护可用节点列表。---## 二、Worker自注册:启动即加入Worker节点启动流程中,第一步就是向Redis注册自身。 ```python import socket import uuid import json import asyncioclass WorkerRegistry:def __init__(self,redis_client,config):self.redis = redis_client self.worker_id = config.get("worker_id",f"worker-{uuid.uuid4().hex[:8]}") self.host = socket.gethostbyname(socket.gethostname()) self.port = config["grpc_port"]self.max_browsers = config["max_browsers"]self.platforms = config["platforms"]self.heartbeat_interval = 10async def register(self):metadata ={"worker_id":self.worker_id,"host":self.host,"port":self.port,"max_browsers":self.max_browsers,"platforms":json.dumps(self.platforms),"status":"online","registered_at":str(time.time()),"last_heartbeat":str(time.time())}await self.redis.hset(f"workers:{self.worker_id}",mapping=metadata) logger.info(f"Worker{self.worker_id}registered at{self.host}:{self.port}")async def start_heartbeat(self):while True:await asyncio.sleep(self.heartbeat_interval) await self.redis.hset( f"workers:{self.worker_id}",mapping={"last_heartbeat":str(time.time()),"status":"online"})# 设置键过期时间,心跳停止后自动清除await self.redis.expire(f"workers:{self.worker_id}",30) ``` Worker写入的信息包括:唯一ID、IP地址、gRPC端口、最大浏览器实例数、支持平台列表。 `last_heartbeat` 字段每10秒刷新一次,同时整个Hash键设置30秒过期时间。 如果Worker进程崩溃或网络断开,心跳停止,30秒后该键自动从Redis中消失。 Master不需要任何特殊逻辑,只需要定期扫描 `workers:*`键,就能得到当前存活节点列表。---## 三、Master服务发现:实时感知拓扑变化Master维护一个内存中的Worker列表,通过定时任务与Redis同步。 ```pythonclass MasterDiscovery:def __init__(self,redis_client):self.redis = redis_clientself.workers:Dict[str,dict]={}self.online_callbacks =[]self.offline_callbacks =[]async def sync_loop(self):while True:await self._sync_workers() await asyncio.sleep(5)async def _sync_workers(self):keys = await self.redis.keys("workers:*")current_ids = set()for key in keys:wid = key.decode().split(":")[1]current_ids.add(wid) data = await self.redis.hgetall(key) worker_info ={k.decode():v.decode() for k,v in data.items()}if wid not in self.workers:self.workers[wid]= worker_info await self._on_worker_online(wid,worker_info)else:self.workers[wid].update(worker_info)# 检查离线的Workeroffline_ids = set(self.workers.keys())-current_idsfor wid in offline_ids:logger.warning(f"Worker{wid}went offline") await self._on_worker_offline(wid,self.workers[wid]) del self.workers[wid]async def _on_worker_online(self,wid,info):logger.info(f"Worker{wid}online:{info['host']}:{info['port']}")for cb in self.online_callbacks:await cb(wid,info) async def _on_worker_offline(self,wid,info):for cb in self.offline_callbacks:await cb(wid,info) ``` 同时,Master还订阅了Redis的键空间通知(Keyspace Notifications),当Worker键过期被删除时,能立即收到事件,无需等待下一次轮询周期。 ```pythonasync def subscribe_keyspace_events(self):pubsub = self.redis.pubsub() await pubsub.subscribe("__keyevent@0__:expired")async for message in pubsub.listen():if message['type']== 'message':key = message['data'].decode()if key.startswith("workers:"):wid = key.split(":")[1]logger.info(f"Worker{wid}key expired")# 触发离线处理``` 这样线上Worker变化能在秒级被Master感知,调度器做任务分配时永远看到最新的可用节点列表。---## 四、负载均衡策略的演进有了动态节点列表,下一步就是如何把任务分给合适的Worker。 最初我们采用简单轮询:`worker = workers[(index++) % len(workers)]`。 这种方式在节点性能不一致时效果很差。 后来演进到加权随机,权重由Worker的能力决定(最大浏览器数、CPU核心数等)。 但在实际运行中,能力只是静态指标,真正的瓶颈是实时负载。 最终我们采用**实时负载感知的加权算法**。Worker在心跳中同步上报当前负载指标:-当前活跃任务数--浏览器实例占用数--CPU使用率--可用内存 ```pythonasync def report_load(self):load_data ={"active_tasks":self.current_tasks,"browser_used":self.browser_pool.used_count(),"cpu_percent":psutil.cpu_percent(interval=1),"mem_available_mb":psutil.virtual_memory().available // (1024 * 1024)}await self.redis.hset(f"workers:{self.worker_id}",mapping={"load":json.dumps(load_data),"last_heartbeat":str(time.time())}) ``` Master根据综合评分选择Worker: ```python def select_best_worker(self,task_platform:str):candidates =[]for wid,info in self.workers.items():platforms = json.loads(info.get("platforms","[]"))if task_platform not in platforms:continue load = json.loads(info.get("load","{}"))# 计算负载分数(越低越好)score = ( load.get("active_tasks",0) * 10 + load.get("browser_used",0) * 5 + load.get("cpu_percent",0) * 0.1 ) candidates.append((wid,score))if not candidates:return Nonecandidates.sort(key=lambda x:x[1]) return candidates[0][0]``` 这种策略天然地将任务导向最空闲的节点,同时支持异构机器。---## 五、故障转移与任务迁移当某个Worker突然离线,它上面正在执行的任务不能跟着消失。 必须有一套任务接管机制。 Master检测到Worker离线后,立即: 1. 查询该Worker上所有 `RUNNING` 状态的任务 2. 2. 检查每个任务是否在Redis Streams中有pending消息 3. 3. 将pending消息通过 `XCLAIM` 转交给其他正常Worker 4. 4. 任务被重新执行(得益于之前设计的幂等机制,不会产生重复副作用) ```python async def handle_worker_failure(self,wid):# 获取该Worker的pending任务pending_tasks = await self.redis.xpending( f"task:stream:{self.workers[wid]['platform']}","worker-group")for task in pending_tasks:# 转交给另一个Workerawait self.redis.xclaim( f"task:stream:{self.workers[wid]['platform']}","worker-group","recovery-worker",min_idle_time=0,message_ids=[task['message_id']]) logger.info(f"Reassigned{len(pending_tasks)}tasks from failed worker{wid}") ``` 这层保障确保了单节点故障不影响整体自动化运营。---## 六、自动扩缩容的雏形有了动态注册和负载感知,自动扩缩容就顺理成章了。 我们在Master上运行一个容量评估协程:-每1分钟检查全局任务队列积压量--如果积压超过阈值且所有Worker负载都>70%,触发扩容信号--扩容信号通过企业微信通知运维,或直接调用云API(如果是云主机)启动新实例--新实例启动后自动注册,Master立即开始向其分发任务 反之,如果持续低负载,可以建议缩容。 虽然完全自动的缩容暂时没启用(怕误判),但通知机制已经省去了大量人工巡检。 ```pythonasync def capacity_controller(self):while True:await asyncio.sleep(60) total_load = sum( json.loads(w.get("load","{}")).get("cpu_percent",0) for w in self.workers.values() ) avg_load = total_load / max(len(self.workers),1) queue_length = await self.redis.xlen("task:stream:pdd")# 示例if queue_length > 50 and avg_load > 70:await self.alert.send("建议扩容:队列积压{},平均CPU负载{}%".format(queue_length,avg_load)) ```---## 七、一个教训:注册风暴在最初实现时,我们没做任何启动保护。 有一次批量重启了10台Worker,它们在几乎同一秒内向Redis写入注册信息,触发了Master的多次全量同步,Master短时间内大量查询Redis,造成短暂的CPU飙升和任务分发延迟。 后来我们在Worker启动时加入了随机延迟(0-5秒),并在Master同步逻辑中加入去抖动机制(2秒内多次变化只处理最后一次)。 问题解决。>这些问题在小规模时根本暴露不出来,只有节点数上去了才会触发。---## 八、写在最后自动化系统的规模一旦上去了,执行节点的管理就会变成新的瓶颈。 让Worker自己注册、自己报告状态、自己续约心跳,Master只做观测和决策,这套模式让我们的运维负担直线下降。**一个真正健壮的分布式自动化系统,应该像生物体一样:有新的细胞加入,就自动融入;有细胞坏死,就自动排除。**而服务注册与发现,就是这套自组织能力的基础。---*作者:林焱*