当前位置: 首页 > news >正文

mini-job极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器

mini-job

极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器。

特性

特性说明
延迟任务设定延迟秒数,到期自动执行
Cron 周期调度支持标准 cron 表达式(分 时 日 月 星期)
三种执行器async协程(IO 密集)、thread线程(通用)、process进程(CPU 密集)
队列级执行器隔离Redis Key{ns}:ready:{executor}三队列隔离,消费者只拉取专属队列,零竞争
死信队列失败的一次性任务自动进入死信队列,可排查重试
可见性超时回收消费者崩溃后任务自动回收重入队列,不丢任务
命名空间隔离多环境共用同一 Redis 实例,Key 前缀隔离
监控指标内置QueueMetrics,统计各生命周期计数
背压控制队列深度超阈值自动告警
Pydantic 配置集中配置管理,环境变量覆盖,类型校验
Lua 原子操作抢占和回收均为 Redis 端原子执行,无竞态
优雅关闭SIGTERM/SIGINT 信号处理,等待任务完成

安装

pipinstallmini-job# 核心依赖pipinstallmini-job[script]# 含 pandas/numpy(脚本执行模式)

依赖:Python >= 3.12,Redis >= 7.4,croniter,pydantic-settings,python-dotenv

快速开始

1. 确保 Redis 运行

redis-cliping# PONG

2. 生产者 — 发布任务

frommini_jobimportDelayQueue dq=DelayQueue(namespace="myapp")# 注册脚本(动态执行模式)dq.register_script("send_email",""" def handler(payload): to_email = payload.get('to') print(f'发送邮件到: {to_email}') return {'status': 'sent', 'to': to_email} """,)# 发布任务 — executor 参数指定执行器类型dq.publish("send_email",{"to":"user@example.com","subject":"欢迎","content":"注册成功"},executor="async",# async / thread / process)# 延迟 30 秒执行dq.publish("send_email",{...},delay_seconds=30)# 每天凌晨 2 点执行(cron 表达式:分 时 日 月 星期)dq.publish("send_email",{...},cron="0 2 * * *")# 查询任务结果result=dq.get_task_result(task_id)

3. 消费者 — 按类型独立启动

frommini_jobimportDelayQueue# 注册本地函数defsend_sms(payload):print(f"发送短信 ->{payload['phone']}")TASK_REGISTRY={"send_sms":send_sms,}dq=DelayQueue(namespace="myapp")dq.start(task_registry=TASK_REGISTRY,executor_type="async",# 本进程只消费 async 任务)

启动不同执行器类型的消费者(3 个终端):

python consumer.py async# 协程消费者 — IO 密集任务python consumer.py thread# 线程消费者 — 通用任务python consumer.py process# 进程消费者 — CPU 密集任务

核心概念

执行器类型

类型适用场景实现推荐并发数
asyncIO 密集(发邮件、HTTP 请求、DB 操作)asyncio协程100~500
thread通用任务、阻塞操作ThreadPoolExecutor30~100
processCPU 密集(数据处理、报表生成)ProcessPoolExecutorCPU 核数

任务路由表

TASK_REGISTRY={# 简单格式:默认 async 执行器"send_sms":send_sms,# 带配置格式:指定执行器类型"daily_report":(daily_report,{"executor":"thread"}),}

状态生命周期

pending → running → completed ↘ failed → 死信队列(一次性任务) 下次重试(周期任务)

Redis Key 设计

{namespace}:ready:{executor} — 按执行器隔离的就绪 ZSet(async/thread/process) {namespace}:processing:{id} — 消费者专属处理列表 {namespace}:processing:timeout — 全局超时追踪 ZSet {namespace}:dead_letter — 死信队列 {namespace}:dead_letter:detail — 死信详情 {namespace}:task:meta — 任务元数据 {namespace}:task:result:{id} — 任务结果(独立 TTL) {namespace}:scripts — 注册脚本

API 参考

DelayQueue

dq=DelayQueue(namespace="myapp")# 或使用配置对象frommini_jobimportQueueConfig dq=DelayQueue(QueueConfig(namespace="myapp"))

生产者方法:

方法说明
publish(func, payload, delay_seconds=0, cron=None, executor="async")发布任务 → 返回 task_id
register_script(name, content, language="python", use=[])注册动态脚本
get_script(name)获取脚本信息
delete_script(name)删除脚本
list_scripts()列出所有脚本
get_task_result(task_id)查询任务状态和结果

消费者方法:

方法说明
start(task_registry, executor_type="async", **kwargs)启动消费者
stop()手动触发优雅关闭

start()参数:

参数默认值说明
task_registry(必填)任务路由表{"name": func}
executor_type"async"执行器类型:async / thread / process
poll_interval0.5轮询间隔(秒)
grab_limit80每次最多抢占任务数
worker_threads50工作线程/协程/进程数
task_timeout30单个任务超时(秒)
visibility_timeout60可见性超时(秒)

配置

通过 Pydantic Settings 管理,支持.env文件、环境变量覆盖、类型校验。

队列配置DQ_*

参数环境变量默认值类型说明
namespaceDQ_NAMESPACE"dq"strRedis Key 命名空间前缀,多环境隔离
consumer_idDQ_CONSUMER_ID自动生成str消费者唯一标识,默认worker-+ 8 位 hex
result_ttlDQ_RESULT_TTL86400int任务结果保留时间(秒),默认 1 天
reclaim_intervalDQ_RECLAIM_INTERVAL10int超时回收检查间隔(轮询周期数),每 N 轮检查一次

Redis 连接配置DQ_REDIS_*

参数环境变量默认值类型说明
hostDQ_REDIS_HOST"localhost"strRedis 主机地址
portDQ_REDIS_PORT6379intRedis 端口
dbDQ_REDIS_DB0intRedis 数据库编号
passwordDQ_REDIS_PASSWORDNonestrRedis 密码(可选)
max_connectionsDQ_REDIS_MAX_CONNECTIONS50int连接池最大连接数
socket_timeoutDQ_REDIS_SOCKET_TIMEOUT5.0float单次操作超时(秒)
socket_connect_timeoutDQ_REDIS_SOCKET_CONNECT_TIMEOUT5.0float连接建立超时(秒)
retry_on_timeoutDQ_REDIS_RETRY_ON_TIMEOUTTruebool超时是否自动重试
health_check_intervalDQ_REDIS_HEALTH_CHECK_INTERVAL30int连接健康检查间隔(秒)

消费者配置DQ_CONSUMER_*

参数环境变量默认值类型说明
poll_intervalDQ_CONSUMER_POLL_INTERVAL0.5float轮询间隔(秒),影响任务延迟精度
grab_limitDQ_CONSUMER_GRAB_LIMIT80int每次最多抢占任务数,建议 worker × 1.5~2
worker_threadsDQ_CONSUMER_WORKER_THREADS50int工作协程/线程/进程数
task_timeoutDQ_CONSUMER_TASK_TIMEOUT30int单个任务执行超时(秒),超时后标记失败
visibility_timeoutDQ_CONSUMER_VISIBILITY_TIMEOUT60int可见性超时(秒),消费者需在此时间内完成任务
shutdown_timeoutDQ_CONSUMER_SHUTDOWN_TIMEOUT30int优雅关闭最大等待时间(秒)
max_queue_depthDQ_CONSUMER_MAX_QUEUE_DEPTH10000int队列深度告警阈值,超阈值打印 WARNING

示例.env

# 队列DQ_NAMESPACE=productionDQ_CONSUMER_ID=web-server-01# RedisDQ_REDIS_HOST=redis.example.comDQ_REDIS_PORT=6379DQ_REDIS_PASSWORD=secret# 消费者DQ_CONSUMER_POLL_INTERVAL=0.3DQ_CONSUMER_GRAB_LIMIT=100DQ_CONSUMER_WORKER_THREADS=80DQ_CONSUMER_TASK_TIMEOUT=60DQ_CONSUMER_VISIBILITY_TIMEOUT=120

监控

# 获取监控指标快照snapshot=dq.metrics.snapshot()# {'published': 1000, 'completed': 980, 'failed': 15, 'timeout': 5, ...}

指标说明:

指标含义
published已发布任务总数
completed成功完成数
failed执行失败数
timeout超时任务数
dead_lettered进入死信队列数
reclaimed超时回收重入队数

项目结构

mini_job/ ├── __init__.py # 公共导出 ├── config.py # Pydantic Settings 配置 ├── core/ │ ├── delay_queue.py # DelayQueue 核心 │ └── task.py # 任务模型 ├── executor/ │ ├── base.py # 执行器抽象基类 │ ├── async_io.py # 协程执行器 │ ├── thread.py # 线程执行器 │ └── process.py # 进程执行器 ├── redis/ │ ├── client.py # Redis 连接 + Lua 脚本 │ └── scripts.lua # 原子 Lua 脚本 ├── utils/ │ ├── retry.py # 重试装饰器 │ ├── metrics.py # 监控指标 │ └── decorators.py # 任务装饰器 ├── consumer.py # 消费者示例 └── producer.py # 生产者示例

License

MIT

http://www.jsqmd.com/news/718073/

相关文章:

  • 【论文阅读】AWR:Simple and scalable off-policy RL
  • AI 赋能研发:现代开发者的效率进阶与工程化落地实践
  • 思源黑体TTF:7种字重完美解决多语言排版难题
  • 二向箔压缩测试:从宇宙规律武器到软件测试范式的跨界思考
  • AWS DevOps Agent 实测:AI 自主运维从告警到根因报告的完整技术路径
  • 【Hot 100 刷题计划】 LeetCode 23. 合并 K 个升序链表 | C++ 顺序合并
  • MusicFree插件完全指南:打造你的个性化跨平台音乐中心
  • 推荐2款无需安装实用软件,桌面图标整理设置,简真是Windows神器!
  • 解码AI用户心智,筑牢可信GEO根基——悠易科技深度参与《中国AI用户态度与行为研究报告(2026)》发布会
  • 从Jupyter Notebook到生产API,Docker AI Toolkit 2026全流程自动化部署(含OpenTelemetry埋点、Prometheus监控集成脚本)
  • GitHub中文界面大改造:3分钟让英文GitHub秒变中文版
  • XPath Helper Plus:3分钟掌握网页元素精准定位的终极指南
  • WASM容器化部署为何突然爆发?,2026全球Top 12边缘AI项目验证的Docker+WASI运行时架构演进路径
  • 别再为低价忽视丝印规格
  • 如何3分钟解锁Wallpaper Engine所有壁纸素材?RePKG工具终极指南
  • Ostrakon-VL-8B数据预处理详解:餐饮图像清洗与标注规范
  • 从ArrayList到VectorSpecies:Java向量化开发全流程拆解,含GraalVM AOT+Linux perf火焰图调优实战
  • MCP Server 接口开发规范与最佳实践
  • QQ音乐加密文件终极解密指南:3步解锁你的音乐宝藏
  • 忍者像素绘卷Codex使用技巧:利用AI编程助手快速开发模型调用脚本
  • Java 25虚拟线程资源调度黄金参数表(2024 Q3压测实录:TPS提升3.8倍,P99延迟下降67ms)
  • Gmail账号自动生成神器:Python脚本实现3分钟批量创建无限邮箱
  • 构建基于nli-MiniLM2-L6-H768的智能学习系统:习题与知识点自动关联
  • WeDLM-7B-Base入门:Python零基础环境配置与第一个生成程序
  • 一次惊心动魄的年报
  • 程序验证技术演进与Preguss框架创新实践
  • 【基于 macOS 虚拟机的 iMessage 批量消息处理技术实践】
  • 数据结构基础------初识二叉树
  • 剖析2026年酒店鱼缸定制工厂,哪家价格合理又好用 - 工业设备
  • 2026年3c认证插座有哪些品牌?安全性能解析 - 品牌排行榜