当前位置: 首页 > news >正文

Kafka 消息推拉

1. 背景与需求

1.1 业务场景

阶段 场景 说明
当前 DLQ 预览 + 选择性重投 从不同业务的死信 Topic 拉取消息,人工预览后,将选中消息重投到原业务 Topic
后续 定时任务批量重投 按已提交 offset 增量拉取,自动重投
后续 多业务多 Topic Topic 名从数据库读取,本轮由调用方传入

1.2 本轮范围

做:

  • 引入 org.apache.kafka:kafka-clients 依赖
  • KafkaMessageUtils 推拉能力(拉取预览、发送重投、offset 提交)
  • 配置类、业务对象、错误码、单元测试

不做:

  • Controller / API 接口
  • 定时任务
  • 从数据库查 Topic 名
  • 消息业务解析与转换

2. 设计思想

2.1 方案选型

方案 描述 优点 缺点 结论
A(采用) KafkaMessageUtils + 短生命周期 Consumer + 单例 Producer 简单、预览不污染 offset、符合项目 XxxUtils 规范 高频拉取有连接开销 ✅ 推荐
B 长连接池化 Consumer/Producer 性能好 生命周期与并发复杂,本轮过度设计
C 引入 spring-kafka Spring 生态完整 与用户要求的原生 kafka-clients 不符,依赖更重

2.2 核心设计原则

(1)预览与消费分离

DLQ 预览的核心诉求是「看得到、不丢、可重复看」:

  • Consumer 设置 enable.auto.commit = false
  • pullMessages 只 poll、不 commit
  • 重投成功后由调用方显式调用 commitOffsets,避免误消费

(2)推拉职责清晰

pullMessages(dlqTopic)  →  业务筛选  →  sendMessages(originalTopic, selected)  →  commitOffsets(dlqTopic, selected)
  • :只读,返回带 partition + offsetKafkaMessageBO,供选择性重投定位
  • :保留原 keyheadersvalue 原样转发,不做反序列化
  • 提交:按分区取最大 offset + 1 提交,支持批量重投后一次性推进

(3)Client 生命周期

组件 策略 原因
KafkaConsumer 每次 pull / commitOffsets 临时创建,try-with-resources 关闭 预览为低频操作,避免长连接状态管理
KafkaProducer 应用内懒加载单例,@PreDestroy 关闭 重投可能批量发生,复用连接更高效

(4)多业务 Topic 隔离

  • Topic 名由调用方传入(后续从 DB 查)
  • Consumer Group ID 规则:{groupIdPrefix}-{topic},不同 DLQ Topic 的 offset 互不影响

(5)增量拉取预留

KafkaPullOptionsBO.fromBeginning

  • true(默认):seekToBeginning,适合人工全量预览
  • false:从当前 group 已提交 offset 继续读,适合后续定时任务增量重投

(6)异常与日志

遵循项目 BizException 规范:

场景 处理方式
参数非法(topic 为空、消息列表为空等) new BizException(PARAM_ERROR, msg),无 cause
Kafka 连接/拉取/发送/提交失败 BizException.wrap(KAFKA_OPERATION_FAILED, 固定文案, e)
中间层 log.warn 一行场景日志,不打堆栈

新增错误码:KAFKA_OPERATION_FAILED(2004, "Kafka操作失败")

(7)命名与分层

类型 类名 包路径
Spring 组件工具类 KafkaMessageUtils com.liang.learn.util
配置 KafkaProperties com.liang.learn.properties
业务中间对象 KafkaMessageBOKafkaPullOptionsBO com.liang.learn.model.bo

3. 架构与数据流

sequenceDiagramparticipant Caller as 调用方 Serviceparticipant Utils as KafkaMessageUtilsparticipant DLQ as DLQ Topicparticipant Biz as 业务 TopicCaller->>Utils: pullMessages(dlqTopic, options)Utils->>DLQ: poll(auto.commit=false)DLQ-->>Utils: ConsumerRecordsUtils-->>Caller: List<KafkaMessageBO>Note over Caller: 人工筛选待重投消息Caller->>Utils: sendMessages(originalTopic, selected)Utils->>Biz: Producer send(保留 key/headers)Caller->>Utils: commitOffsets(dlqTopic, selected)Utils->>DLQ: commitSync(offset+1)

3.1 类职责

职责
KafkaProperties Broker 地址、Consumer/Producer 默认参数
KafkaMessageBO 单条消息载体(含 partition/offset 供定位与提交)
KafkaPullOptionsBO 拉取条数、超时、是否从头预览
KafkaMessageUtils 推拉与 offset 提交的具体实现

4. 配置说明

4.1 Maven 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

版本由 Spring Boot BOM 管理,无需显式指定。

4.2 application.yaml

app:kafka:bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:127.0.0.1:9092}consumer:group-id-prefix: learn-dlq-previewauto-offset-reset: earliestmax-poll-records: 100poll-timeout-ms: 3000producer:acks: allretries: 3
配置项 默认值 说明
bootstrap-servers 127.0.0.1:9092 Broker 地址,支持环境变量覆盖
consumer.group-id-prefix learn-dlq-preview 预览 Consumer Group 前缀
consumer.auto-offset-reset earliest 无已提交 offset 时从最早可读位置开始
consumer.max-poll-records 100 单次拉取上限
consumer.poll-timeout-ms 3000 poll 超时(毫秒)
producer.acks all 重投可靠性
producer.retries 3 发送重试次数

5. API 说明

5.1 KafkaMessageUtils

// 预览 DLQ,不提交 offset
List<KafkaMessageBO> pullMessages(String topic, KafkaPullOptionsBO options);// 重投单条 / 批量(保留 key、headers)
void sendMessage(String targetTopic, KafkaMessageBO message);
void sendMessages(String targetTopic, List<KafkaMessageBO> messages);// 重投成功后提交 DLQ offset
void commitOffsets(String dlqTopic, List<KafkaMessageBO> messages);

5.2 典型用法

@Autowired
private KafkaMessageUtils kafkaMessageUtils;public void replaySelectedDlq(String dlqTopic, String originalTopic) {// 1. 预览List<KafkaMessageBO> all = kafkaMessageUtils.pullMessages(dlqTopic, null);// 2. 业务筛选(示例:按 key 过滤)List<KafkaMessageBO> selected = all.stream().filter(m -> "retry-me".equals(m.getKey())).toList();if (selected.isEmpty()) {return;}// 3. 重投kafkaMessageUtils.sendMessages(originalTopic, selected);// 4. 提交 offsetkafkaMessageUtils.commitOffsets(dlqTopic, selected);
}

5.3 增量拉取(定时任务预留)

KafkaPullOptionsBO options = new KafkaPullOptionsBO();
options.setFromBeginning(false);
List<KafkaMessageBO> incremental = kafkaMessageUtils.pullMessages(dlqTopic, options);

6. 文件清单

文件 说明
pom.xml 新增 kafka-clients 依赖
application.yaml 新增 app.kafka 配置
ErrorCode.java 新增 KAFKA_OPERATION_FAILED
KafkaProperties.java Kafka 配置属性
KafkaMessageBO.java 消息业务对象
KafkaPullOptionsBO.java 拉取参数
KafkaMessageUtils.java 推拉工具类
KafkaMessageUtilsTest.java 单元测试

7. 源码

7.1 KafkaProperties

package com.liang.learn.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** Kafka 连接与推拉行为配置** @author liang* @since 2026-06-10 15:02:18*/
@Data
@Component
@ConfigurationProperties(prefix = "app.kafka")
public class KafkaProperties {/*** Kafka Broker 地址,多个以逗号分隔*/private String bootstrapServers;/*** Consumer 相关配置*/private Consumer consumer = new Consumer();/*** Producer 相关配置*/private Producer producer = new Producer();/*** Kafka Consumer 配置项*/@Datapublic static class Consumer {/*** DLQ 预览 Consumer Group 前缀,实际 groupId 为 prefix-topic*/private String groupIdPrefix = "learn-dlq-preview";/*** 无已提交 offset 时的起始策略,预览场景默认 earliest*/private String autoOffsetReset = "earliest";/*** 单次 poll 最大记录数*/private int maxPollRecords = 100;/*** poll 等待超时(毫秒)*/private long pollTimeoutMs = 3000L;}/*** Kafka Producer 配置项*/@Datapublic static class Producer {/*** 发送确认级别,重投场景默认 all*/private String acks = "all";/*** 发送失败重试次数*/private int retries = 3;}
}

7.2 KafkaMessageBO

package com.liang.learn.model.bo;import lombok.Data;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** Kafka 单条消息业务对象,用于 DLQ 预览与选择性重投** @author liang* @since 2026-06-10 15:02:18*/
@Data
public class KafkaMessageBO {/*** 消息所在 Topic(预览时为 DLQ Topic)*/private String topic;/*** 分区号*/private int partition;/*** 分区内 offset*/private long offset;/*** 消息 Key,可为 null*/private String key;/*** 消息体*/private String value;/*** 消息头,重投时原样保留*/private Map<String, String> headers = new HashMap<>();/*** Broker 记录时间戳(毫秒)*/private long timestamp;/*** 返回不可变 headers 视图,避免外部修改内部状态** @return headers 只读副本*/public Map<String, String> getHeaders() {if (headers == null || headers.isEmpty()) {return Collections.emptyMap();}return Collections.unmodifiableMap(headers);}/*** 设置消息头,null 时置为空 Map** @param headers 消息头*/public void setHeaders(Map<String, String> headers) {if (headers == null || headers.isEmpty()) {this.headers = new HashMap<>();return;}this.headers = new HashMap<>(headers);}
}

7.3 KafkaPullOptionsBO

package com.liang.learn.model.bo;import lombok.Data;/*** Kafka 拉取(预览)参数** @author liang* @since 2026-06-10 15:02:18*/
@Data
public class KafkaPullOptionsBO {/*** 本次最多拉取条数,null 时使用配置默认值*/private Integer maxRecords;/*** poll 超时(毫秒),null 时使用配置默认值*/private Long pollTimeoutMs;/*** true 从各分区最早可读 offset 预览;false 从当前 group 已提交 offset 继续读*/private Boolean fromBeginning = Boolean.TRUE;
}

7.4 KafkaMessageUtils

package com.liang.learn.util;import com.liang.learn.exception.BizException;
import com.liang.learn.model.bo.KafkaMessageBO;
import com.liang.learn.model.bo.KafkaPullOptionsBO;
import com.liang.learn.model.enums.ErrorCode;
import com.liang.learn.properties.KafkaProperties;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Kafka 消息推拉工具类,面向 DLQ 预览与选择性重投场景** @author liang* @since 2026-06-10 15:02:18*/
@Slf4j
@Component
public class KafkaMessageUtils {private static final String STRING_SERIALIZER = StringSerializer.class.getName();private static final String STRING_DESERIALIZER = StringDeserializer.class.getName();private static final String GROUP_TOPIC_SEPARATOR = "-";private final KafkaProperties kafkaProperties;private volatile KafkaProducer<String, String> producer;public KafkaMessageUtils(KafkaProperties kafkaProperties) {this.kafkaProperties = kafkaProperties;}public List<KafkaMessageBO> pullMessages(String topic, KafkaPullOptionsBO options) {checkTopic(topic);checkBootstrapServers();KafkaPullOptionsBO resolvedOptions = resolvePullOptions(options);Properties consumerProps = buildConsumerProperties(topic);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {List<TopicPartition> partitions = listTopicPartitions(consumer, topic);if (partitions.isEmpty()) {return Collections.emptyList();}return pollMessages(consumer, partitions, topic, resolvedOptions);} catch (BizException e) {throw e;} catch (Exception e) {log.warn("Kafka 消息拉取失败, topic={}", topic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息拉取失败", e);}}public void sendMessage(String targetTopic, KafkaMessageBO message) {checkTopic(targetTopic);checkMessage(message);sendMessages(targetTopic, List.of(message));}public void sendMessages(String targetTopic, List<KafkaMessageBO> messages) {checkTopic(targetTopic);checkMessagesNotEmpty(messages);checkBootstrapServers();KafkaProducer<String, String> activeProducer = getOrCreateProducer();try {for (KafkaMessageBO message : messages) {checkMessage(message);ProducerRecord<String, String> record = buildProducerRecord(targetTopic, message);activeProducer.send(record).get();}activeProducer.flush();} catch (InterruptedException e) {Thread.currentThread().interrupt();log.warn("Kafka 消息发送被中断, targetTopic={}", targetTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息发送失败", e);} catch (ExecutionException e) {log.warn("Kafka 消息发送失败, targetTopic={}", targetTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka消息发送失败", e.getCause());}}public void commitOffsets(String dlqTopic, List<KafkaMessageBO> messages) {checkTopic(dlqTopic);checkMessagesNotEmpty(messages);checkBootstrapServers();Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildOffsetsToCommit(dlqTopic, messages);Properties consumerProps = buildConsumerProperties(dlqTopic);try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {consumer.assign(new ArrayList<>(offsetsToCommit.keySet()));consumer.commitSync(offsetsToCommit);} catch (BizException e) {throw e;} catch (Exception e) {log.warn("Kafka offset 提交失败, dlqTopic={}", dlqTopic);throw BizException.wrap(ErrorCode.KAFKA_OPERATION_FAILED, "Kafka offset 提交失败", e);}}@PreDestroypublic void destroy() {if (producer != null) {producer.close();producer = null;}}private KafkaPullOptionsBO resolvePullOptions(KafkaPullOptionsBO options) {KafkaPullOptionsBO resolved = options == null ? new KafkaPullOptionsBO() : options;if (resolved.getMaxRecords() == null) {resolved.setMaxRecords(kafkaProperties.getConsumer().getMaxPollRecords());}if (resolved.getPollTimeoutMs() == null) {resolved.setPollTimeoutMs(kafkaProperties.getConsumer().getPollTimeoutMs());}if (resolved.getFromBeginning() == null) {resolved.setFromBeginning(Boolean.TRUE);}if (resolved.getMaxRecords() <= 0) {throw new BizException(ErrorCode.PARAM_ERROR, "maxRecords 必须大于 0");}if (resolved.getPollTimeoutMs() <= 0) {throw new BizException(ErrorCode.PARAM_ERROR, "pollTimeoutMs 必须大于 0");}return resolved;}private List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topic) {List<org.apache.kafka.common.PartitionInfo> partitionInfos = consumer.partitionsFor(topic);if (partitionInfos == null || partitionInfos.isEmpty()) {return Collections.emptyList();}List<TopicPartition> partitions = new ArrayList<>(partitionInfos.size());for (org.apache.kafka.common.PartitionInfo partitionInfo : partitionInfos) {partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));}return partitions;}private List<KafkaMessageBO> pollMessages(KafkaConsumer<String, String> consumer,List<TopicPartition> partitions,String topic,KafkaPullOptionsBO options) {consumer.assign(partitions);if (Boolean.TRUE.equals(options.getFromBeginning())) {consumer.seekToBeginning(partitions);}List<KafkaMessageBO> result = new ArrayList<>();long deadline = System.currentTimeMillis() + options.getPollTimeoutMs();while (result.size() < options.getMaxRecords() && System.currentTimeMillis() < deadline) {long remaining = deadline - System.currentTimeMillis();if (remaining <= 0) {break;}ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(remaining));if (records.isEmpty()) {break;}for (ConsumerRecord<String, String> record : records) {result.add(toMessageBO(record));if (result.size() >= options.getMaxRecords()) {return result;}}}return result;}private KafkaMessageBO toMessageBO(ConsumerRecord<String, String> record) {KafkaMessageBO message = new KafkaMessageBO();message.setTopic(record.topic());message.setPartition(record.partition());message.setOffset(record.offset());message.setKey(record.key());message.setValue(record.value());message.setHeaders(extractHeaders(record.headers()));message.setTimestamp(record.timestamp());return message;}private ProducerRecord<String, String> buildProducerRecord(String targetTopic, KafkaMessageBO message) {ProducerRecord<String, String> record =new ProducerRecord<>(targetTopic, null, message.getKey(), message.getValue());applyHeaders(record, message.getHeaders());return record;}private Map<TopicPartition, OffsetAndMetadata> buildOffsetsToCommit(String dlqTopic,List<KafkaMessageBO> messages) {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (KafkaMessageBO message : messages) {if (message.getPartition() < 0) {throw new BizException(ErrorCode.PARAM_ERROR, "消息 partition 非法");}if (message.getOffset() < 0) {throw new BizException(ErrorCode.PARAM_ERROR, "消息 offset 非法");}TopicPartition topicPartition = new TopicPartition(dlqTopic, message.getPartition());OffsetAndMetadata nextOffset = new OffsetAndMetadata(message.getOffset() + 1);offsets.merge(topicPartition, nextOffset, (existing, incoming) ->existing.offset() >= incoming.offset() ? existing : incoming);}return offsets;}private Map<String, String> extractHeaders(Headers headers) {if (headers == null) {return Collections.emptyMap();}Map<String, String> headerMap = new LinkedHashMap<>();for (Header header : headers) {byte[] value = header.value();String text = value == null ? null : new String(value, StandardCharsets.UTF_8);headerMap.put(header.key(), text);}return headerMap;}private void applyHeaders(ProducerRecord<String, String> record, Map<String, String> headers) {if (headers == null || headers.isEmpty()) {return;}for (Map.Entry<String, String> entry : headers.entrySet()) {String value = entry.getValue();byte[] bytes = value == null ? null : value.getBytes(StandardCharsets.UTF_8);record.headers().add(entry.getKey(), bytes);}}private KafkaProducer<String, String> getOrCreateProducer() {if (producer == null) {synchronized (this) {if (producer == null) {producer = new KafkaProducer<>(buildProducerProperties());}}}return producer;}private Properties buildConsumerProperties(String topic) {KafkaProperties.Consumer consumerConfig = kafkaProperties.getConsumer();Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, buildConsumerGroupId(topic));props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, STRING_DESERIALIZER);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.getAutoOffsetReset());props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerConfig.getMaxPollRecords());return props;}private Properties buildProducerProperties() {KafkaProperties.Producer producerConfig = kafkaProperties.getProducer();Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER);props.put(ProducerConfig.ACKS_CONFIG, producerConfig.getAcks());props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.getRetries());return props;}private String buildConsumerGroupId(String topic) {return kafkaProperties.getConsumer().getGroupIdPrefix() + GROUP_TOPIC_SEPARATOR + topic;}private void checkTopic(String topic) {if (!StringUtils.hasText(topic)) {throw new BizException(ErrorCode.PARAM_ERROR, "Topic 不能为空");}}private void checkBootstrapServers() {if (!StringUtils.hasText(kafkaProperties.getBootstrapServers())) {throw new BizException(ErrorCode.PARAM_ERROR, "Kafka bootstrap-servers 未配置");}}private void checkMessage(KafkaMessageBO message) {if (message == null) {throw new BizException(ErrorCode.PARAM_ERROR, "消息不能为空");}}private void checkMessagesNotEmpty(List<KafkaMessageBO> messages) {if (messages == null || messages.isEmpty()) {throw new BizException(ErrorCode.PARAM_ERROR, "消息列表不能为空");}}
}

7.5 ErrorCode 新增项

/*** Kafka 操作失败(拉取、发送、offset 提交等)*/
KAFKA_OPERATION_FAILED(2004, "Kafka操作失败"),

7.6 KafkaMessageUtilsTest

package com.liang.learn.util;import com.liang.learn.exception.BizException;
import com.liang.learn.model.bo.KafkaMessageBO;
import com.liang.learn.model.bo.KafkaPullOptionsBO;
import com.liang.learn.model.enums.ErrorCode;
import com.liang.learn.properties.KafkaProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;import java.util.Collections;
import java.util.List;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;class KafkaMessageUtilsTest {private KafkaMessageUtils kafkaMessageUtils;@BeforeEachvoid setUp() {KafkaProperties kafkaProperties = new KafkaProperties();kafkaProperties.setBootstrapServers("127.0.0.1:9092");kafkaMessageUtils = new KafkaMessageUtils(kafkaProperties);}@Testvoid pullMessages_blankTopic_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.pullMessages(" ", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("Topic"));}@Testvoid pullMessages_missingBootstrapServers_shouldThrowParamError() {KafkaProperties kafkaProperties = new KafkaProperties();KafkaMessageUtils utils = new KafkaMessageUtils(kafkaProperties);BizException exception = assertThrows(BizException.class,() -> utils.pullMessages("dlq-topic", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("bootstrap-servers"));}@Testvoid pullOptions_invalidMaxRecords_shouldThrowParamError() {KafkaPullOptionsBO options = new KafkaPullOptionsBO();options.setMaxRecords(0);BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.pullMessages("dlq-topic", options));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("maxRecords"));}@Testvoid sendMessage_nullMessage_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.sendMessage("target-topic", null));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息"));}@Testvoid sendMessages_emptyList_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.sendMessages("target-topic", Collections.emptyList()));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息列表"));}@Testvoid commitOffsets_emptyMessages_shouldThrowParamError() {BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.commitOffsets("dlq-topic", List.of()));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("消息列表"));}@Testvoid commitOffsets_invalidOffset_shouldThrowParamError() {KafkaMessageBO message = new KafkaMessageBO();message.setPartition(0);message.setOffset(-1L);BizException exception = assertThrows(BizException.class,() -> kafkaMessageUtils.commitOffsets("dlq-topic", List.of(message)));assertEquals(ErrorCode.PARAM_ERROR.getCode(), exception.getCode());assertTrue(exception.getMessage().contains("offset"));}@Testvoid kafkaMessageBO_headers_shouldBeImmutableView() {KafkaMessageBO message = new KafkaMessageBO();message.setHeaders(Map.of("original-topic", "biz-topic"));Map<String, String> headers = message.getHeaders();assertEquals("biz-topic", headers.get("original-topic"));assertThrows(UnsupportedOperationException.class, () -> headers.put("k", "v"));}
}

8. 测试与验证

mvn test -Dtest=KafkaMessageUtilsTest
mvn test

当前测试覆盖:

  • Topic / bootstrap-servers 参数校验
  • maxRecords 非法值校验
  • 发送 / 提交时空消息列表校验
  • offset 非法值校验
  • KafkaMessageBO.headers 不可变视图

集成测试(连接真实 Kafka)未纳入本轮,可在后续接 Testcontainers 或内嵌 Kafka 补充。


9. 后续扩展建议

方向 说明
DLQ 预览 API 增加 Controller:/api/kafka/dlq/preview/api/kafka/dlq/replay
DB 查 Topic Service 层按业务 ID 查 DLQ / 原 Topic 映射,再调 KafkaMessageUtils
定时重投 @Scheduled + fromBeginning=false 增量拉取 + 全量重投 + commit
消息转换 按业务需要增加 JSON 反序列化、过滤、脱敏等中间层,不侵入 Utils
安全认证 KafkaProperties 扩展 SASL/SSL 配置项

10. 讨论记录摘要

问题 结论
主要场景 DLQ 预览 → 选择性重投;后续定时重投
重投方式 重投原 Topic,保留 key/headers
DLQ Topic 多业务多 Topic,Topic 名由调用方传入(后续 DB)
本轮范围 仅基础设施,不含 API / 定时任务 / DB
依赖选型 原生 kafka-clients,不用 spring-kafka
工具类命名 KafkaMessageUtils(Spring 组件,后缀 Utils
http://www.jsqmd.com/news/988037/

相关文章:

  • YouTube首帧3秒背后的工程真相:ABR、QUIC与分片优化实战
  • 2026年6月最新版济南第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026年 医药品牌升级推荐榜:聚焦战略、视觉与信任重塑的全案解析及优质服务商盘点 - 品牌发掘
  • Obsidian AI搜索:Claudian插件的10个高级搜索技巧终极指南
  • 武汉电机回收公司排行:合规性与服务能力实测对比 - 起跑123
  • LocalSend Linux AppImage制作:跨发行版兼容性解决方案终极指南
  • GEE 时间序列合成、时序线性插值与SG滤波
  • 青岛正规靠谱的防水修缮公司有哪些? - 青岛防水品牌推荐
  • 2026年国内发光竹蜻蜓厂家盘点(附部分企业介绍) - 企师傅推荐官
  • 远郊覆盖榜:北京远郊收酒不额外收费六家 - 光耀华夏品牌榜
  • Mist:告别繁琐,三步搞定macOS系统安装与固件管理
  • VSCode配置STM32开发环境避坑指南:从编译报错到调试成功,我踩过的那些坑
  • WAN2.2 All In One终极指南:8GB显存快速生成AI视频的完整教程
  • 2026年6月最新版惠州第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • CosyVoice2流式语音合成音色一致性技术深度解析与架构优化方案
  • 2026年西安装修公司推荐:基于全案能力与施工管控的综合实力测评 - 科技焦点
  • DNS有关知识(根域名服务器、顶级域名服务器、权威域名服务器)
  • 2026北京公司注册代办机构专业度排行:基于10000+案例的实测对比 - 互联网科技品牌测评
  • AMD GPU终极指南:stable-diffusion-webui-directml如何释放你的显卡潜能
  • 项目三简易计算器 任务3-5六位密码锁
  • 2026年6月最新版葫芦岛第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026深圳家庭/企业/长途搬迁全场景正规靠谱搬家机构名单,让搬家更省心 - 从来都是英雄出少年
  • Nex-N2-mini 智能体思维框架深度解析:自适应思维与连贯性思维原理
  • 武汉空调回收厂家排行 5家合规服务商实测对比 - 起跑123
  • Home Assistant区域管理终极指南:按房间智能控制你的家居设备
  • 2026年6月最新版毫州第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026年6月最新版呼伦贝尔第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • 2026年6月最新版广安第三方CMACNAS甲醛检测治理机构口碑名单:万清CMA检测中心等5家公司深度测评万清CMA检测中心TOP1推荐 - 一修哥咨询
  • CH341A/B USB转USART/I2C/SPI介绍
  • 界面控件DevExpress WPF中文教程:Data Grid - 绑定数据