当前位置: 首页 > news >正文

doc-llm-autotest 基于大模型的文档自动化测试平台:worker服务的可靠性增强

一、可靠性分析

从架构图上,我们可以看出worker调用大模型服务过程中,会发生阻塞等待,如果此时worker异常容器挂掉了,那么此次任务状态会一直为processing,并且因为redis关联task_id的消息已经被消费了,那么这个任务就无法被识别出来重试。

基于这个场景分析,我们要补充巡检服务,去定时重启处于超时并且状态为processing的任务,此时服务可以从mysql捞任务表,但考虑到性能等影响,我们选择在redis构建新的processing队列,存储正在执行的task_id,构建processing_ts队列存储开始处理时间,巡检服务访问redis的processing队列、processing_ts队列来更新状态异常的任务。

适配worker服务逻辑:设置原子操作保证worker取任务+放入processing不会被中断。

image

 二、逻辑实现

1. doc_llm_test_worker补充原子操作将task从ready移动到processing,记录开始执行的时间

TASK_QUEUE_READY_KEY = "docllm:queue:ready"
TASK_QUEUE_PROCESSING_KEY = "docllm:queue:processing"
TASK_PROCESSING_TS_KEY = "docllm:hash:processing_ts"def worker_loop():"""文档检查任务 worker 主循环"""logging.info("doc_llm_test_worker started, waiting for tasks...")while True:try:raw_item = redis_client.brpoplpush(TASK_QUEUE_READY_KEY, TASK_QUEUE_PROCESSING_KEY, timeout=10)if not raw_item:time.sleep(5)continue # 没有任务,就继续下一轮try:payload_str = raw_item.decode("utf-8")data = json.loads(payload_str)task_id = int(data["task_id"])except Exception as e:logging.exception(f"invalid processing queue item: {raw_item!r}")redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item)continuestart_ts = int(time.time())redis_client.hset(TASK_PROCESSING_TS_KEY, task_id, start_ts)try:process_task(task_id)finally:redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw_item)redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)except Exception:logging.exception("unexpected error in worker loop, sleep 3s")time.sleep(3)

2.补充巡检服务,定时重启处于超时并且状态为processing的任务,需要做到重新入队 + 状态恢复流程

设置参数 PROCESSING_TIMEOUT_SECONDS = 600

判断逻辑:

now_ts - start_ts > PROCESSING_TIMEOUT_SECONDS

该任务视为:

  • worker 处理失败(worker 崩了/卡死)

  • 需要重新 pending

  • 丢回 ready 队列给新的 worker

适配task_service,提供给巡检服务同步改数据库任务状态

def mark_task_processing(task_id: int) -> bool:"""worker 刚拿到任务时调用:pending -> processing"""with get_session() as session:stmt = (update(TaskDocLLM).where(TaskDocLLM.task_id == task_id,TaskDocLLM.status == TaskStatus.pending).values(status=TaskStatus.processing,processing_started_at=func.now()))result = session.execute(stmt)session.commit()return result.rowcount == 1def reclaim_task(task_id: int, timeout_dt) -> bool:"""将超时的任务重新放回队列:param timeout_dt: datetime对象,代表“必须早于此时间才会被恢复”"""with get_session() as session:stmt = (update(TaskDocLLM).where(TaskDocLLM.task_id == task_id,TaskDocLLM.status == TaskStatus.processing,TaskDocLLM.processing_started_at < timeout_dt).values(status=TaskStatus.pending,retry_count=TaskDocLLM.retry_count + 1,processing_started_at=None,result=None))result = session.execute(stmt)session.commit()return result.rowcount == 1

新增巡检函数reaper_loop,筛选超时任务,恢复状态:

def reaper_loop():"""巡检 processing 队列,恢复超时的任务"""logging.info("doc_llm_reaper started, interval=%ss, timeout=%ss", REAPER_INTERVAL_SECONDS, PROCESSING_TIMEOUT_SECONDS)while True:try:now_ts = int(time.time())timeout_border_ts = now_ts - PROCESSING_TIMEOUT_SECONDStimeout_threshold_dt = datetime.utcnow() - timedelta(seconds=PROCESSING_TIMEOUT_SECONDS)items = redis_client.lrange(TASK_QUEUE_PROCESSING_KEY, 0, -1)if not items:time.sleep(REAPER_INTERVAL_SECONDS)continuefor raw in items:try:payload_str = raw.decode("utf-8")payload = json.loads(payload_str)task_id = payload.get("task_id")task_name = payload.get("task_name")except Exception:redis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw)continuestart_ts_raw = redis_client.hget(TASK_PROCESSING_TS_KEY, task_id)if start_ts_raw is None:continuestart_ts = int(start_ts_raw)if start_ts > timeout_border_ts:continuelogging.warning(f"doc_llm_reaper: task {task_id} seems stuck, start_ts={start_ts}, now_ts={now_ts}")ok = task_service.reclaim_task(task_id, timeout_threshold_dt)if not ok:continueredis_client.lrem(TASK_QUEUE_PROCESSING_KEY, 1, raw)redis_client.hdel(TASK_PROCESSING_TS_KEY, task_id)new_payload = json.dumps({"task_id": task_id, "task_name": task_name}, ensure_ascii=False)redis_client.lpush(TASK_QUEUE_READY_KEY, new_payload)logging.info(f"doc_llm_reaper: task {task_id} reclaimed and requeued to READY")except Exception:logging.exception("unexpected error in reaper loop, sleep 3s")time.sleep(REAPER_INTERVAL_SECONDS)

在主进程之外,起一个线程循环跑巡检:

def start_reaper_thread():reaper_thread = threading.Thread(target=reaper_loop, name="doc_llm_reaper", daemon=True)reaper_thread.start()return reaper_threadif __name__ == "__main__":setup_logging()init_llm()start_reaper_thread()worker_loop()

 

http://www.jsqmd.com/news/67431/

相关文章:

  • 个人电脑本地私有知识库:访答知识库深度解析
  • 58
  • 58
  • TB710FU原厂刷机包下载_CN_ZUI_17.0.04.279_ST_250808
  • Python service Flask generate list data and display in web view via html and javscript
  • 仿真分析工具 Abaqus 2024 下载安装教程:含安装包下载 + 配置教程,新手也能一次成功
  • 香橙派上进行 Livox Mid-360 激光雷达开发(二)移植FAST_LIO
  • Mybatis拦截器原理解析
  • 10406_基于Springboot的社交平台系统
  • aaaa
  • TB331FC原厂刷机包下载_CNZUI_17.0.572_ST_250910
  • 2025云南短视频制作服务商/公司TOP5推荐!昆明等地短视频制作企业榜单发布,赋能企业品牌传播新生态
  • 2025 年 12 月杭州公寓出租权威推荐榜:精选浙江优质房源,温馨宜居与便捷交通的完美之选
  • 解码继承——代码复用与层次化设计
  • 2025年12月北京陪诊公司推荐榜:专业机构对比分析与用户选择指南
  • TB365FC刷机包_CN_ZUXOS_1.1.10.122_ST_250828
  • Python 异步编程:使用 async/await 实现高效并发 - 指南
  • 超越大语言模型:蒸馏技术实战指南
  • TB520FU刷机包_CN_17.0.10.158_ST_250817
  • web框架——flask3.x-上下文管理机制
  • JavaEE初阶——多线程(9)JUC的程序类和死锁
  • [智能体设计模式] 第 1 章:提示链(Prompt Chaining) - 实践
  • 极速AI助手 - 多AI服务桌面助手, 支持MCP工具调用, 内置免费AI功能
  • 蓝鲸花呗客服妙招帮你脱困省油大空间低配拆解银河的“水桶车细节值得吵一架
  • 吴恩达深度学习课程四:计算机视觉 第一周:卷积基础知识(一)图像处理基础
  • Python函数基础实战教程:从定义调用到参数传值全解析
  • 内旋与外旋两种旋转方式
  • 索引数组读取修改添加
  • zsj_蓝桥python系列二_Python 基础语法 _Python 列表推导式
  • 12.08