Go语言RabbitMQ实战:企业级消息队列开发
Go语言RabbitMQ实战:企业级消息队列开发
1. RabbitMQ概述
RabbitMQ是最流行的开源消息中间件之一,实现了AMQP协议,支持多种消息模式,具有可靠性高、功能丰富、易于使用等特点。
2. 连接管理
package rabbitmq import ( "fmt" "time" amqp "github.com/rabbitmq/amqp091-go" ) type Connection struct { conn *amqp.Connection channel *amqp.Channel url string } func NewConnection(url string) (*Connection, error) { conn, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } ch, err := conn.Channel() if err != nil { conn.Close() return nil, fmt.Errorf("failed to open channel: %w", err) } return &Connection{ conn: conn, channel: ch, url: url, }, nil } func (c *Connection) Channel() *amqp.Channel { return c.channel } func (c *Connection) Close() error { if c.channel != nil { c.channel.Close() } if c.conn != nil { return c.conn.Close() } return nil } func (c *Connection) Reconnect() error { c.Close() for i := 0; i < 3; i++ { conn, err := amqp.Dial(c.url) if err == nil { c.conn = conn c.channel, err = conn.Channel() if err == nil { return nil } } time.Sleep(time.Second * time.Duration(i+1)) } return fmt.Errorf("failed to reconnect after 3 attempts") }3. 交换机和队列声明
type Exchange struct { name string exchangeType string durable bool } func NewExchange(name, exchangeType string, durable bool) *Exchange { return &Exchange{ name: name, exchangeType: exchangeType, durable: durable, } } func (e *Exchange) Declare(ch *amqp.Channel) error { return ch.ExchangeDeclare( e.name, e.exchangeType, e.durable, false, false, false, nil, ) } type Queue struct { name string durable bool exclusive bool autoDelete bool args amqp.Table } func NewQueue(name string, durable bool) *Queue { return &Queue{ name: name, durable: durable, exclusive: false, autoDelete: false, args: nil, } } func (q *Queue) Declare(ch *amqp.Channel) (amqp.Queue, error) { return ch.QueueDeclare( q.name, q.durable, q.exclusive, q.autoDelete, false, q.args, ) } func (q *Queue) Bind(ch *amqp.Channel, exchangeName, routingKey string) error { return ch.QueueBind( q.name, routingKey, exchangeName, false, nil, ) }4. 生产者实现
type Publisher struct { conn *Connection exchange string } func NewPublisher(conn *Connection, exchange string) *Publisher { return &Publisher{ conn: conn, exchange: exchange, } } func (p *Publisher) Publish(ctx context.Context, routingKey string, body []byte) error { return p.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, Body: body, Timestamp: time.Now(), }, ) } func (p *Publisher) PublishWithHeaders(ctx context.Context, routingKey string, body []byte, headers map[string]interface{}) error { return p.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, Body: body, Timestamp: time.Now(), Headers: headers, }, ) } func (p *Publisher) PublishWithDelay(ctx context.Context, routingKey string, body []byte, delay time.Duration) error { delayHeaders := map[string]interface{}{ "x-delay": int(delay.Milliseconds()), } return p.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, Body: body, Timestamp: time.Now(), Headers: delayHeaders, }, ) }5. 消费者实现
type Consumer struct { conn *Connection queue string autoAck bool exclusive bool noLocal bool noWait bool args amqp.Table } func NewConsumer(conn *Connection, queue string, autoAck bool) *Consumer { return &Consumer{ conn: conn, queue: queue, autoAck: autoAck, } } func (c *Consumer) Consume(handler func([]byte) error) error { msgs, err := c.conn.Channel().Consume( c.queue, "", c.autoAck, c.exclusive, c.noLocal, c.noWait, c.args, ) if err != nil { return fmt.Errorf("failed to register consumer: %w", err) } for msg := range msgs { if err := handler(msg.Body); err != nil { msg.Nack(false, true) continue } if !c.autoAck { msg.Ack(false) } } return nil } func (c *Consumer) ConsumeWithContext(ctx context.Context, handler func([]byte) error) error { msgs, err := c.conn.Channel().Consume( c.queue, "", c.autoAck, c.exclusive, c.noLocal, c.noWait, c.args, ) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() case msg, ok := <-msgs: if !ok { return nil } if err := handler(msg.Body); err != nil { msg.Nack(false, true) continue } if !c.autoAck { msg.Ack(false) } } } }6. 确认机制
type ConfirmedConsumer struct { conn *Connection queue string deliveryTag uint64 } func NewConfirmedConsumer(conn *Connection, queue string) *ConfirmedConsumer { return &ConfirmedConsumer{ conn: conn, queue: queue, } } func (c *ConfirmedConsumer) Consume(handler func([]byte) error) error { msgs, err := c.conn.Channel().Consume( c.queue, "", false, false, false, false, nil, ) if err != nil { return err } for msg := range msgs { if err := handler(msg.Body); err != nil { msg.Nack(false, true) c.deliveryTag = msg.DeliveryTag continue } msg.Ack(false) c.deliveryTag = msg.DeliveryTag } return nil } func (c *ConfirmedConsumer) GetDeliveryTag() uint64 { return c.deliveryTag }7. 消息持久化
type DurablePublisher struct { conn *Connection exchange string } func NewDurablePublisher(conn *Connection, exchange string) *DurablePublisher { return &DurablePublisher{ conn: conn, exchange: exchange, } } func (p *DurablePublisher) PublishDurable(ctx context.Context, routingKey string, body []byte) error { return p.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: body, Timestamp: time.Now(), Priority: 0, }, ) } type MessagePriority int const ( PriorityLow MessagePriority = 1 PriorityNormal MessagePriority = 5 PriorityHigh MessagePriority = 9 ) func (p *DurablePublisher) PublishWithPriority(ctx context.Context, routingKey string, body []byte, priority MessagePriority) error { return p.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: body, Priority: uint8(priority), Timestamp: time.Now(), }, ) }8. 延迟队列
延迟队列在订单超时、任务调度等场景中应用广泛。
type DelayQueue struct { conn *Connection delayExchange string targetExchange string } func NewDelayQueue(conn *Connection, delayExchange, targetExchange string) *DelayQueue { return &DelayQueue{ conn: conn, delayExchange: delayExchange, targetExchange: targetExchange, } } func (dq *DelayQueue) Setup() error { ch := dq.conn.Channel() err := ch.ExchangeDeclare( dq.delayExchange, "x-delayed-message", true, false, false, false, amqp.Table{ "x-delayed-type": "direct", }, ) if err != nil { return err } _, err = ch.QueueDeclare("delay_queue", true, false, false, false, nil) if err != nil { return err } return ch.QueueBind("delay_queue", "delay_key", dq.delayExchange, false, nil) } func (dq *DelayQueue) PublishDelay(ctx context.Context, routingKey string, body []byte, delay time.Duration) error { headers := map[string]interface{}{ "x-delay": delay.Milliseconds(), } return dq.conn.Channel().PublishWithContext( ctx, dq.delayExchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, Body: body, Headers: headers, }, ) }9. 总结
RabbitMQ是功能丰富的企业级消息队列,本文介绍了Go语言中使用amqp091-go库操作RabbitMQ的方法,包括连接管理、消息发布、消息消费、确认机制、延迟队列等核心功能。
