Go语言消息队列事务:Exactly-Once与At-Least-Once语义
Go语言消息队列事务:Exactly-Once与At-Least-Once语义
1. 消息语义
消息队列有三种传递语义:At-Most-Once(最多一次)、At-Least-Once(至少一次)和Exactly-Once(恰好一次)。
type DeliverySemantics int const ( AtMostOnce DeliverySemantics = 1 AtLeastOnce DeliverySemantics = 2 ExactlyOnce DeliverySemantics = 3 )2. At-Least-Once实现
At-Least-Once需要消费者进行消息确认。
type AtLeastOnceConsumer struct { consumer *RabbitMQConsumer processed map[string]bool mu sync.RWMutex } func NewAtLeastOnceConsumer(consumer *RabbitMQConsumer) *AtLeastOnceConsumer { return &AtLeastOnceConsumer{ consumer: consumer, processed: make(map[string]bool), } } func (c *AtLeastOnceConsumer) Process(ctx context.Context, handler func([]byte) error) error { return c.consumer.ConsumeWithContext(ctx, func(msg []byte) error { msgID := generateMsgID(msg) c.mu.RLock() if c.processed[msgID] { c.mu.RUnlock() return nil } c.mu.RUnlock() if err := handler(msg); err != nil { return err } c.mu.Lock() c.processed[msgID] = true c.mu.Unlock() return nil }) } func generateMsgID(msg []byte) string { h := fnv.New32a() h.Write(msg) return fmt.Sprintf("%d-%d", time.Now().UnixNano(), h.Sum32()) }3. Exactly-Once实现
Exactly-Once需要生产者和消费者配合,通过事务和幂等性保证。
type ExactlyOnceProducer struct { producer *RabbitMQProducer txn *amqp.Tx } func (p *ExactlyOnceProducer) PublishWithTransaction(ctx context.Context, routingKey string, body []byte) error { if p.txn == nil { tx, err := p.producer.conn.Channel().Tx() if err != nil { return err } p.txn = tx } err := p.producer.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp.Persistent, }, ) if err != nil { p.txn.Rollback() return err } return p.txn.Commit() } type IdempotentHandler struct { storage map[string]bool mu sync.RWMutex } func NewIdempotentHandler() *IdempotentHandler { return &IdempotentHandler{ storage: make(map[string]bool), } } func (h *IdempotentHandler) Handle(msgID string, handler func() error) error { h.mu.RLock() if h.storage[msgID] { h.mu.RUnlock() return nil } h.mu.RUnlock() if err := handler(); err != nil { return err } h.mu.Lock() h.storage[msgID] = true h.mu.Unlock() return nil }4. 总结
本文介绍了At-Least-Once和Exactly-Once消息语义的实现方法,开发者应根据业务需求选择合适的传递语义。
