Go语言NSQ实战:轻量级高性能消息系统
Go语言NSQ实战:轻量级高性能消息系统
1. NSQ概述
NSQ是Bitly开源的实时分布式消息平台,具有无单点故障、高可用、易于部署等优点,适合构建高吞吐量的实时消息系统。
2. 生产者实现
package nsq import ( "github.com/nsqio/go-nsq" ) type Producer struct { producer *nsq.Producer address string } func NewProducer(address string) (*Producer, error) { config := nsq.NewConfig() producer, err := nsq.NewProducer(address, config) if err != nil { return nil, err } return &Producer{ producer: producer, address: address, }, nil } func (p *Producer) Publish(topic string, message []byte) error { return p.producer.Publish(topic, message) } func (p *Producer) DeferredPublish(topic string, message []byte, delay time.Duration) error { return p.producer.DeferredPublish(topic, delay, message) } func (p *Producer) MultiPublish(topic string, messages [][]byte) error { return p.producer.MultiPublish(topic, messages) } func (p *Producer) Close() error { p.producer.Stop() return nil }3. 消费者实现
type Consumer struct { consumer *nsq.Consumer handler MessageHandler } type MessageHandler func([]byte) error func NewConsumer(topic, channel string, handler MessageHandler) (*Consumer, error) { config := nsq.NewConfig() config.MaxInFlight = 100 consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { return nil, err } h := &handlerWrapper{handler: handler} consumer.AddHandler(h) return &Consumer{ consumer: consumer, handler: handler, }, nil } type handlerWrapper struct { handler MessageHandler } func (h *handlerWrapper) HandleMessage(msg *nsq.Message) error { return h.handler(msg.Body) } func (c *Consumer) ConnectToNSQD(address string) error { return c.consumer.ConnectToNSQD(address) } func (c *Consumer) ConnectToNSQLookupd(address string) error { return c.consumer.ConnectToNSQLookupd(address) } func (c *Consumer) Start() error { return nil } func (c *Consumer) Stop() { c.consumer.Stop() }4. 总结
NSQ是一个轻量级、高性能的消息系统,本文介绍了Go语言中使用go-nsq库进行消息生产和消费的方法。
