当前位置: 首页 > news >正文

Go语言Kafka实战:高性能消息队列开发指南

Go语言Kafka实战:高性能消息队列开发指南

1. Kafka概述

Apache Kafka是分布式流处理平台,具有高吞吐量、低延迟、可持久化、可横向扩展等优点,广泛用于日志收集、实时数据管道、流处理等场景。

2. 生产者实现

package kafka import ( "context" "time" "github.com/IBM/sarama" ) type Producer struct { producer sarama.SyncProducer topic string } func NewProducer(brokers []string, topic string) (*Producer, error) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true config.Producer.Compression = sarama.CompressionSnappy config.Net.DialTimeout = 10 * time.Second config.Net.ReadTimeout = 10 * time.Second config.Net.WriteTimeout = 10 * time.Second producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, err } return &Producer{ producer: producer, topic: topic, }, nil } func (p *Producer) SendMessage(key, value string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } partition, offset, err := p.producer.SendMessage(msg) if err != nil { return err } fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset) return nil } func (p *Producer) SendMessageWithHeaders(key string, value []byte, headers map[string]string) error { msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value), } for k, v := range headers { msg.Headers = append(msg.Headers, sarama.RecordHeader{ Key: []byte(k), Value: []byte(v), }) } _, _, err := p.producer.SendMessage(msg) return err } func (p *Producer) Close() error { return p.producer.Close() }

3. 消费者实现

type Consumer struct { consumer sarama.ConsumerGroup topic string handler *ConsumerGroupHandler } type ConsumerGroupHandler struct { ready chan bool } func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { close(h.ready) return nil } func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for { select { case message, ok := <-claim.Messages(): if !ok { return nil } fmt.Printf("Message received: key=%s, value=%s, partition=%d, offset=%d\n", string(message.Key), string(message.Value), message.Partition, message.Offset) session.MarkMessage(message, "") case <-session.Context().Done(): return nil } } } func NewConsumer(brokers []string, groupID, topic string) (*Consumer, error) { config := sarama.NewConfig() config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()} config.Consumer.Offsets.Initial = sarama.OffsetNewest consumer, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { return nil, err } handler := &ConsumerGroupHandler{ ready: make(chan bool), } return &Consumer{ consumer: consumer, topic: topic, handler: handler, }, nil } func (c *Consumer) Start(ctx context.Context) error { for { if err := c.consumer.Consume(ctx, []string{c.topic}, c.handler); err != nil { return err } } } func (c *Consumer) Close() error { return c.consumer.Close() }

4. 异步生产者

type AsyncProducer struct { producer sarama.AsyncProducer topic string } func NewAsyncProducer(brokers []string, topic string) (*AsyncProducer, error) { config := sarama.NewConfig() config.Producer.Compression = sarama.CompressionSnappy config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Flush.Frequency = 500 * time.Millisecond producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, err } ap := &AsyncProducer{ producer: producer, topic: topic, } go ap.handleErrors() go ap.handleSuccesses() return ap, nil } func (a *AsyncProducer) handleErrors() { for err := range a.producer.Errors() { fmt.Printf("Producer error: %v\n", err) } } func (a *AsyncProducer) handleSuccesses() { for success := range a.producer.Successes() { fmt.Printf("Message sent to partition %d at offset %d\n", success.Partition, success.Offset) } } func (a *AsyncProducer) SendAsync(key, value string) { a.producer.Input() <- &sarama.ProducerMessage{ Topic: a.topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(value), } } func (a *AsyncProducer) Close() error { return a.producer.Close() }

5. 消息可靠性保障

type ReliableProducer struct { producer *Producer retries int } func NewReliableProducer(brokers []string, topic string, retries int) (*ReliableProducer, error) { producer, err := NewProducer(brokers, topic) if err != nil { return nil, err } return &ReliableProducer{ producer: producer, retries: retries, }, nil } func (p *ReliableProducer) SendWithRetry(ctx context.Context, key, value string) error { var lastErr error for i := 0; i < p.retries; i++ { if err := p.producer.SendMessage(key, value); err != nil { lastErr = err time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) continue } return nil } return lastErr }

6. 消费者组管理

type ConsumerGroupManager struct { brokers []string groups map[string]*Consumer } func NewConsumerGroupManager(brokers []string) *ConsumerGroupManager { return &ConsumerGroupManager{ brokers: brokers, groups: make(map[string]*Consumer), } } func (m *ConsumerGroupManager) RegisterConsumer(groupID, topic string) (*Consumer, error) { consumer, err := NewConsumer(m.brokers, groupID, topic) if err != nil { return nil, err } m.groups[groupID] = consumer return consumer, nil } func (m *ConsumerGroupManager) UnregisterConsumer(groupID string) error { consumer, ok := m.groups[groupID] if !ok { return nil } delete(m.groups, groupID) return consumer.Close() }

7. 分区策略

type CustomPartitioner struct{} func (p *CustomPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) { key, err := message.Key.Encode() if err != nil || key == nil { return sarama.NewRandomPartitioner().Partition(message, numPartitions) } hash := crc32.ChecksumIEEE(key) return int32(hash % uint32(numPartitions)), nil } func (p *CustomPartitioner) RequiresConsistency() bool { return true }

8. 消息拦截器

type LoggingInterceptor struct{} func (i *LoggingInterceptor) OnSend(msg *sarama.ProducerMessage) { fmt.Printf("Sending message to topic %s, partition %d\n", msg.Topic, msg.Partition) } func (i *LoggingInterceptor) OnDeliver(msg *sarama.ProducerMessage, err error) { if err != nil { fmt.Printf("Failed to deliver message: %v\n", err) } else { fmt.Printf("Message delivered to topic %s, partition %d\n", msg.Topic, msg.Partition) } }

9. 总结

本文详细介绍了Go语言中使用Sarama库操作Kafka的方法,包括生产者、消费者、异步处理、可靠性保障等核心功能,为构建高性能消息队列系统提供了完整的解决方案。

http://www.jsqmd.com/news/788023/

相关文章:

  • Raycast MCP Server Manager:统一管理AI编辑器MCP配置
  • 眼科AI偏见陷阱全解析:从数据收集到临床部署的七步规避法
  • MiGPT小爱音箱AI改造:5分钟打造专属智能语音助手终极指南
  • 炉石传说终极模改插件HsMod:50+功能全面提升游戏体验的完整指南
  • AI赋能文献计量分析:从数据采集到主题建模的完整实践指南
  • Go语言消息队列实战案例:订单系统与秒杀系统
  • 开源统一身份认证平台Casdoor:架构解析与生产实践指南
  • 802.11p车联网技术解析与应用实践
  • ARM架构HFGRTR_EL2寄存器与虚拟化陷阱机制详解
  • CANN/metadef自动映射函数
  • 开发者如何用Markdown+Git构建高效个人知识库
  • Dify C# SDK开发指南:.NET生态AI应用集成实战
  • 深度拆解 MS09-012:从“低权访客”到“系统之神”的跨越
  • 百度网盘解析工具终极指南:告别限速,实现高速下载
  • 基于传递熵的EEG脑网络信息流分析:从原理到工程实践
  • CANN/metadef子图映射注册器
  • 矢量控制与空间矢量调制在电机驱动中的应用
  • 高斯过程回归在材料科学中的应用:预测拓扑半金属材料
  • 英雄联盟界面定制新纪元:在合规边界内重塑你的游戏身份
  • Docker化Jira部署实战:cptactionhank镜像详解与生产环境配置
  • Apache Airflow 系列教程 | 第23课:安全体系与权限管理
  • 为开源AI智能体项目Hermes Agent配置Taotoken作为自定义模型供应商
  • CANN/ascend-transformer-boost ReshapeAndCache C++示例
  • Copy4AI:智能代码复制工具,优化AI编程助手上下文交互
  • WarcraftHelper终极指南:魔兽争霸III现代化优化完整方案
  • Go语言RabbitMQ实战:企业级消息队列开发
  • WAF拦不住?一篇搞懂SQL注入绕过原理与实战
  • 2026年上饶GEO优化公司排行:本土服务商客观盘点 - 打我的的
  • 量子启发优化在信用评分模型中的应用与优化
  • CUDA内核内存安全验证:挑战与Model2Kernel解决方案