Go语言分布式任务调度:Machinery实战
Go语言分布式任务调度:Machinery实战
1. Machinery概述
Machinery是一个开源的分布式任务队列库,基于Redis实现,支持任务异步执行、定时调度、任务重试等功能。
2. 任务队列实现
package machinery import ( "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/config" ) type TaskServer struct { server *machinery.Server } func NewTaskServer(broker, backend string) (*TaskServer, error) { cnf := &config.Config{ Broker: broker, ResultBackend: backend, } server, err := machinery.NewServer(cnf) if err != nil { return nil, err } return &TaskServer{server: server}, nil } func (s *TaskServer) RegisterTasks(tasks map[string]interface{}) error { return s.server.RegisterTasks(tasks) } func (s *TaskServer) StartWorker(concurrency int) error { worker := s.server.NewWorker("worker", concurrency) return worker.Launch() }3. 任务定义
func SendEmail(email string, subject string, body string) error { return nil } func ProcessPayment(orderID string, amount float64) error { return nil } func GenerateReport(userID string, date string) error { return nil } tasks := map[string]interface{}{ "send_email": SendEmail, "process_payment": ProcessPayment, "generate_report": GenerateReport, }4. 异步任务调用
func (s *TaskServer) AsyncSendEmail(email, subject, body string) (string, error) { asyncTask := s.server.NewTask("send_email", []interface{}{email, subject, body}) result, err := asyncTask.Async() if err != nil { return "", err } return result.UUID, nil } func (s *TaskServer) DelaySendEmail(email, subject, body string) (string, error) { asyncTask := s.server.NewTask("send_email", []interface{}{email, subject, body}) result, err := asyncTask.delay(time.After(5 * time.Minute)) if err != nil { return "", err } return result.UUID, nil }5. 定时任务
func (s *TaskServer) ScheduleDailyReport() error { crontab :=.NewCrontab("0 8 * * *") return s.server.RegisterPeriodicTask("daily_report", "@every 24h", "generate_report", nil) }6. 总结
Machinery提供了完整的分布式任务调度能力,支持任务异步执行、定时调度、失败重试、结果存储等功能,适合微服务架构下的后台任务处理场景。
