当前位置: 首页 > news >正文

Kafka——Producer/Consumer

Kafka

  • 特点:高吞吐、低延迟

Producer(生产者)

  • 定义:是向Kafka集群发送消息的客户端应用,如:日志收集器、传感器数据发送器等

一、关键特性

  • 异步发送:可提高吞吐量和降低延迟
  • 消息分区:进而实现负载均衡
  • 消息压缩:可使用gizp、snappy等压缩算法实现消息压缩,然后再传输到服务器
  • 批处理机制:可减少网络I/O操作次数,提高总体的消息吞吐量

二、优化

  • 发送缓冲区:在异步发送消息时,可以配置发送缓冲区来调整发送性能。通过调整发送缓冲区的大小,可以平衡性能和内存使用
  • 重试机制:难免出现发送消息失败的情况,建议考虑设置重试机制,确保消息的可靠性
  • 批处理大小和延迟:调整此参数可优化发送性能。较大的批处理大小和较长的延迟可减少网络开销和提高吞吐量

三、配置项

  • bootstrap.servers:指定Kafka集群所需的broker地址清单,用于建立初始连接
  • acks:指定分区中必须有多少个副本收到这条消息,才算消息发送成功。可选值:
    • 0(不等待任何相应)
    • 1(leader副本确认)
    • all/-1(ISR中所有副本确认)
  • retries:消息发送失败时的重试次数,用于处理临时故障
  • compression.type:消息压缩算法,用于减少网络传输量。可选值:
    • none
    • gzip
    • snappy
    • ls4
    • zstd
  • batch.size:每个批次的内存大小,影响消息的吞吐量和延迟
  • linger.ms:发送批次前的最大等待时间,用于平衡吞吐量和延迟
  • buffer.memory:生产者客户端中用于缓存消息的缓冲区大小
  • key.serializer和value.serializer:消息的键和值的序列化类,用于将消息转换为字节数组
  • max.request.size:生产者客户端能发送的最大消息大小
  • request.timeout.ms:生产者等待请求响应的最长时间

四、常用方法

send():

  • 定义:发送消息到kafka集群

    • 异步发送:直接调用send(record),不关心发送结果(但可通过回调函数处理结果)。
    • 同步发送:调用send(record).get(),等待Kafka响应,获取发送结果(RecordMetadata对象)。
    • 带回调的异步发送:调用send(record, new Callback() {...}),在回调函数中处理发送结果。

close():

  • 定义关闭生产者,释放资源

Consumer(消费者)

  • 定义:从kafka集群读取消息的客户端应用。如:数据分析工具、实时监控系统等

一、关键特性

  • 拉取模式
  • 消费者组:消费组->消费主题->消费分区,从而实现负载均衡。不同消费者组之间互不影响,可以独立地消费同一主题的消息
  • 偏移量管理:消费者可维护偏移量(Offset)来记录自己再分区中消费的位置。偏移量记录方式:重置到较旧的偏移量以重新处理过去的消息;跳到最近的记录从“现在”开始记录

二、优化

  • 自动提交偏移量:简化了消费者的实现,但可能导致消费消息重复和丢失。手动提交偏移量提供了更灵活的控制方式,适合需要确保消息消费成功的场景
  • 消费速率控制:消息者处理速度较慢,可能导致消息积压。即可以通过增加消费者实例、优化消费者逻辑等方式来解决问题
  • 错误处理与重试:遇到的错误如:网络错误、消息格式错误等。消费者需实现适当的错误处理机制,如重试、记录错误日志等来确保消息的可靠性

三、配置项

  • bootstrap.servers:同样用于指定kafka集群的broker地址清单
  • group.id:消费者所属消费组的唯一标识,用于协调消费者之间的消息分配
  • auto.offset.reset:当分区没有初始偏移量或偏移量无效时,消费者的行为。可选值:earliest(从最早的消息开始消费)、latest(从最新的消息开始消费)和none(抛出异常)
  • enable.auto.commit:是否开启自动提交消费位移的功能。
    • true(定期自动提交消费位移)
    • false(需要手动提交)
  • anto.commit.interval.ms:自动提交消费位移的时间间隔
  • fetch.min.bytes:消费者客户端一次请求从kafka拉取消息的最小数据量
  • fetch.max.bytes:消费者客户端一次请求从kafka拉取消息的最大数据量
  • max.poll.records:一次拉取请求的最大消息数
  • max.poll.interval.ms:指定拉取消息线程最长空闲时间
  • session.timeout.ms:检测消费者是否失效的超时时间
  • heartbeat.interval.ms:消费者心跳时间,用于维持与消费者协调器的连接
  • key.deserializer和value.deserializer:消息的键和值的反序列化类,用于将字节数据转换为消息对象

四、常用方法

poll()

含义:消费者向kafka服务器发起一次拉取请求

参数

  • 超时时间
    • 含义:控制消费者在单次poll循环中,如果没拿到数据,最多能等多久
    • 目的:避免忙循环、控制循环速度

subscribe()

  • 用于订阅一个或多个主题

commitSync()或commitAsync()

  • 手动提交偏移量

java和python开发的区别

1、Java

  • consumer-poll()方法
    • 返回结构:一组消息的集合-ConsumerRecords对象
    • 优点:提高吞吐量,批量效率更高

2、python

  • consumer-poll()方法
    • 返回结构:单条消息或None,但内部有批量预取机制
    • 批量预取机制:在后台批量拉取消息存在内部缓冲区中,当循环调用poll()时,从内部缓冲区依次取出单条消息,也相当于一次拉取了多条消息
http://www.jsqmd.com/news/520489/

相关文章:

  • 黑马头条日记 | 微服务项目MinIO与业务代码耦合度过高?耐心看完这篇你就知道如何从零构建MinIO起步依赖!
  • YOLO12实战体验:上传图片秒出结果,可视化标注超简单
  • Docker和K8S
  • 基于Simulink的自适应反步法(Adaptive Backstepping)控制​
  • MinIO Windows版保姆级教程:用NSSM实现服务化部署+多磁盘挂载
  • 解锁《原神》60帧限制:从硬件封印到视觉自由的进阶指南
  • Chandra OCR入门指南:从HuggingFace加载权重到vLLM推理服务的完整迁移路径
  • Cloudchip嵌入式物联网接入库深度解析
  • 避坑指南:不用图传,搞定大华/海康摄像头与Win10/Ubuntu网线直连的IP配置玄学
  • C语言学习文档(六)
  • AVR硬件PWM深度解析:定时器资源管理与跨平台实践
  • LIS302加速度传感器SPI驱动开发与嵌入式集成
  • Cosmos-Reason1-7B自动化运维报告生成:分析系统日志与性能指标
  • 为什么92%的MCP集成项目在灰度期暴雷?深度拆解状态同步的3个隐性断点与防御式编码模板
  • 告别手动添加!用Matlab脚本+IDM命令行,5分钟搞定海量文件自动下载
  • 3个核心价值:OpenLRC如何革新性突破音频转LRC效率瓶颈
  • 智慧水利建设方案(PPT文件)
  • STEP3-VL-10B WebUI使用教程:图片上传与对话功能详解
  • W7500裸机HTTP服务器:基于W5500硬件协议栈的嵌入式LED控制
  • Qwen-Image-2512像素艺术生成服务:支持中文提示词直出高质量结果
  • MogFace-large人脸检测模型Android端集成实战:移动端部署与优化
  • 学Simulink——基于Simulink的模糊滑模混合控制抗参数摄动​
  • SQLMap工具运用
  • HY-MT1.5-7B翻译模型实战:从部署到调用,新手完整操作流程
  • 2026年热门的水下振动传感器公司推荐:水下振动传感器公司选择指南 - 品牌宣传支持者
  • MCP 2.0协议签名机制失效预警:3个被92%企业忽略的证书链成本陷阱(含TLS 1.3兼容性避坑清单)
  • 基于 STM32CubeMX 的 UNIT-00:Berserk Interface 嵌入式部署指南
  • 【Ubuntu】自动化安全升级:配置 unattended-upgrades 的最佳实践
  • 3种合规访问方案:让信息工作者高效获取优质内容
  • 零基础入门学用物联网(ESP8266) 第一部分 基础知识篇(五)