Kafka - 基于Spring Boot实战:消息生产与消费完整实现
event) .addCallback( result -> log.info("Message sent: {}", result.getRecordMetadata()), ex -> log.error("Failed to send message", ex) ); } // 同步发送(阻塞直到确认) public void sendSy

👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长。
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕Kafka这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!
文章目录
Kafka - 基于Spring Boot实战:消息生产与消费完整实现 🚀
在现代微服务架构中,异步通信已成为解耦系统、提升吞吐、增强弹性的核心手段。而 Apache Kafka 凭借其高吞吐、持久化、分布式和可扩展的特性,成为众多企业首选的消息中间件。
Spring Boot 作为 Java 生态中最流行的开发框架,通过 Spring for Apache Kafka(spring-kafka) 提供了对 Kafka 的深度集成,极大简化了消息生产与消费的开发流程。
本文将带你从零开始,基于 Spring Boot 完整实现 Kafka 消息的生产与消费,涵盖:
- 环境搭建与依赖配置
- 消息序列化/反序列化
- 同步/异步发送与回调处理
- 批量消费与手动提交
- 错误处理与死信队列(DLQ)
- 消费者组与分区分配策略
- 监控与测试技巧
全文包含 大量可运行的 Java 代码示例、真实业务场景模拟、Mermaid 架构图,并附带 可访问的官方文档链接,助你快速掌握 Kafka + Spring Boot 实战能力。
💡 前提知识:熟悉 Java、Maven、Spring Boot 基础。无需 Kafka 深度经验,但建议了解基本概念(Topic、Producer、Consumer、Partition 等)。
🔗 官方入门:Apache Kafka Documentation | Spring for Apache Kafka
一、环境准备:启动 Kafka 本地集群 🐳
1.1 使用 Docker 快速启动 Kafka
我们使用 docker-compose 启动一个包含 ZooKeeper + Kafka Broker 的本地环境。
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
启动命令:
docker-compose up -d
✅ 验证:访问
http://localhost:9092(Kafka 无 HTTP 接口,但可通过客户端连接验证)
🔗 镜像来源:Confluent Docker Hub(官方维护,稳定可用)
1.2 创建测试 Topic
使用 Kafka 自带脚本创建 Topic:
# 进入 Kafka 容器
docker exec -it <kafka-container-id> bash
# 创建 Topic
kafka-topics.sh --create \
--topic user-events \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
💡 分区数设为 3,便于后续演示消费者组并行消费。
二、Spring Boot 项目初始化 🛠️
2.1 Maven 依赖
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Lombok(简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson(JSON 序列化) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
✅ Spring Boot 3.x 用户请确保使用 spring-kafka 3.x+,兼容 Jakarta EE 9+。
2.2 配置文件 application.yml
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
batch-size: 16384
linger-ms: 5
consumer:
group-id: user-event-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: false # 手动提交更安全
properties:
spring.json.trusted.packages: "*"
⚠️ 注意:
spring.json.trusted.packages在生产环境应指定具体包名,避免反序列化漏洞。
三、定义消息模型 📦
我们以“用户行为事件”为例,定义一个 POJO:
// UserEvent.java
package com.example.kafkademo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserEvent {
private String userId;
private String eventType; // e.g., "login", "purchase"
private Long timestamp;
private String payload;
}
✅ 使用 Lombok 自动生成 getter/setter/toString,减少样板代码。
四、消息生产者:同步、异步与回调 📤
4.1 注入 KafkaTemplate
Spring Boot 自动配置 KafkaTemplate,直接注入即可:
// KafkaProducerService.java
package com.example.kafkademo.service;
import com.example.kafkademo.model.UserEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, UserEvent> kafkaTemplate;
public void sendUserEvent(String key, UserEvent event) {
kafkaTemplate.send("user-events", key, event);
log.info("✅ Message sent: key={}, event={}", key, event);
}
}
💡
KafkaTemplate是线程安全的,可在多线程环境中共享使用。
4.2 异步发送 + 成功/失败回调
生产环境必须处理发送失败!使用 ListenableFuture 添加回调:
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public void sendWithCallback(String key, UserEvent event) {
ListenableFuture<SendResult<String, UserEvent>> future =
kafkaTemplate.send("user-events", key, event);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(Send)))) {
log.info("📬 Message delivered to partition {} with offset {}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("❌ Failed to send message: {}", ex.getMessage());
// 可在此处记录日志、发告警、存 DB 重试等
}
});
}
✅ 最佳实践:所有关键消息都应添加失败回调,避免静默丢失。
4.3 同步发送(阻塞等待)
某些场景需确保消息已落盘(如金融交易),可使用 get() 阻塞:
public boolean sendSync(String key, UserEvent event) {
try {
SendResult<String, UserEvent> result =
kafkaTemplate.send("user-events", key, event).get();
log.info("✅ Sync send success: offset={}", result.getRecordMetadata().offset());
return true;
} catch (Exception e) {
log.error("❌ Sync send failed", e);
return false;
}
}
⚠️ 谨慎使用:同步发送会降低吞吐,仅用于强一致性场景。
五、消息消费者:从基础到高级 📥
5.1 基础消费:@KafkaListener
最简单的消费方式:
// KafkaConsumerService.java
package com.example.kafkademo.service;
import com.example.kafkademo.model.UserEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "user-events", groupId = "user-event-consumer-group")
public void consume(UserEvent event) {
log.info("🔍 Received event: {}", event);
// 业务逻辑:更新用户状态、写入 DB、触发通知等
processEvent(event);
}
private void processEvent(UserEvent event) {
// 模拟处理耗时
try { Thread.sleep(100); } catch (InterruptedException e) { }
log.info("✅ Processed event for user: {}", event.getUserId());
}
}
✅ Spring 自动完成:
- 反序列化 JSON → UserEvent
- 管理 Consumer Group
- 自动提交 Offset(若启用 auto-commit)
5.2 手动提交 Offset(推荐!)
自动提交可能导致消息丢失(处理中宕机)。手动提交更安全:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
@KafkaListener(topics = "user-events", containerFactory = "kafkaListenerContainerFactory")
public void consumeWithManualAck(UserEvent event, Acknowledgment ack) {
try {
processEvent(event);
ack.acknowledge(); // 成功后才提交 Offset
} catch (Exception e) {
log.error("💥 Error processing event, NOT committing offset", e);
// 不提交 Offset,下次重启会重试
}
}
配套配置(关闭自动提交):
// KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, UserEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
✅ 优势:确保“至少一次”语义(At-Least-Once),避免数据丢失。
5.3 批量消费(提升吞吐)
当消息量大且处理逻辑轻量时,批量消费可显著提升性能:
@KafkaListener(
topics = "user-events",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void consumeBatch(List<UserEvent> events) {
log.info("📦 Received batch of {} events", events.size());
events.forEach(this::processEvent);
// 批量提交(由容器自动处理)
}
配置批量消费工厂:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> batchKafkaListenerContainerFactory(
ConsumerFactory<String, UserEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 启用批量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
配合 Producer 批处理配置(见 application.yml 中的 batch-size 和 linger-ms),可实现高效吞吐。
🔗 性能参考:Kafka Batch Processing Guide
六、错误处理与死信队列(DLQ) 💀
6.1 为什么需要 DLQ?
当某条消息因数据格式错误、业务异常等原因无法处理时:
- 若直接抛异常 → Consumer 停止或 Rebalance;
- 若跳过 → 数据丢失。
死信队列(Dead Letter Queue, DLQ) 提供第三种选择:将坏消息转发至专用 Topic,供人工排查或重试。
6.2 配置 SeekToCurrentErrorHandler + DLQ
Spring Kafka 提供内置错误处理器:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, UserEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, UserEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 配置错误处理器
var errorHandler = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), // 发送到 DLQ
new FixedBackOff(1000L, 3) // 重试 3 次,间隔 1 秒
);
factory.setErrorHandler(errorHandler);
return factory;
}
✅ 效果:
- 消息处理失败 → 重试 3 次;
- 仍失败 → 自动发送到
user-events.DLT(默认 DLQ Topic 名);- 原 Consumer 继续处理下一条消息。
6.3 自定义 DLQ Topic 名称
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition("user-events-dlq", -1) // 自定义 DLQ 名
);
创建 DLQ Topic:
kafka-topics.sh --create --topic user-events-dlq --partitions 3 --bootstrap-server localhost:9092
6.4 监控 DLQ 并告警
可单独消费 DLQ Topic 进行告警:
@KafkaListener(topics = "user-events-dlq")
public void handleDlq(ConsumerRecord<String, UserEvent> record) {
log.error("🚨 DLQ Message: key={}, value={}, offset={}",
record.key(), record.value(), record.offset());
// 发送企业微信/钉钉告警
alertService.sendAlert("Kafka DLQ detected!");
}
✅ 运维建议:将 DLQ 消费纳入监控体系,设置告警阈值。
七、消费者组与分区分配策略 👥
7.1 多实例并行消费
启动多个 Spring Boot 应用实例(相同 group-id),Kafka 会自动分配 Partition:
✅ 效果:3 个实例并行消费,吞吐提升 3 倍。
7.2 自定义分区分配策略
默认使用 RangeAssignor,可改为 StickyAssignor 减少 Rebalance 影响:
# application.yml
spring:
kafka:
consumer:
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
🔗 策略对比:Kafka Partition Assignors
八、消息序列化深度定制 🧩
8.1 为什么需要自定义 Serializer?
- 默认
JsonSerializer依赖类全限定名,不利于多语言; - 需要兼容 Avro、Protobuf 等二进制格式;
- 需要加密/压缩消息体。
8.2 示例:自定义 JSON 序列化(不带类型信息)
// CustomJsonSerializer.java
public class CustomJsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON", e);
}
}
}
配置使用:
spring:
kafka:
producer:
value-serializer: com.example.kafkademo.serializer.CustomJsonSerializer
✅ 优势:消息体更小,兼容非 Java 消费者。
九、端到端测试:确保消息可靠传递 🧪
9.1 单元测试 Producer
@SpringBootTest
class KafkaProducerServiceTest {
@Autowired
private KafkaProducerService producerService;
@Test
void testSendUserEvent() {
UserEvent event = new UserEvent("user123", "login", System.currentTimeMillis(), "test");
producerService.sendWithCallback("user123", event);
// 验证日志或使用 EmbeddedKafka(见下文)
await().atMost(5, SECONDS).untilAsserted(() -> {
// 断言消费成功
});
}
}
9.2 使用 EmbeddedKafka 进行集成测试
Spring Kafka 提供 @EmbeddedKafka 注解,启动内存 Kafka:
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
@SpringBootTest
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"
})
class KafkaIntegrationTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void testMessageFlow() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
// 发送消息
kafkaTemplate.send("test-topic", "key1", "hello");
// 消费验证(需配合 @KafkaListener)
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
}
🔗 官方指南:Testing with Embedded Kafka
十、生产环境最佳实践清单 ✅
| 项目 | 建议 |
|---|---|
| 序列化 | 使用 JSON/Avro,避免 Java 序列化 |
| Offset 提交 | 手动提交(MANUAL_IMMEDIATE) |
| 错误处理 | 配置 DLQ + 重试机制 |
| 消费者 | 关闭 auto-commit,合理设置 max.poll.records |
| 监控 | 监控 Lag、消费速率、DLQ 积压 |
| Topic 设计 | 按业务域划分,合理设置分区数 |
| 安全 | 启用 SASL/SSL(生产环境必需) |
🔗 安全配置:Kafka Security Guide
十一、常见问题与解决方案 ❓
Q1:消息发送成功但消费者收不到?
- 检查 Topic 名称是否拼写错误;
- 确认 Consumer Group ID 是否正确;
- 查看
auto-offset-reset配置:若为latest且消息已存在,则不会消费历史消息。
Q2:消费者频繁 Rebalance?
- 原因:处理时间过长导致心跳超时;
- 解决:
- 降低
max.poll.records(如 100 → 10); - 增加
max.poll.interval.ms(默认 5 分钟); - 优化业务逻辑,避免长时间阻塞。
- 降低
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 300000 # 5分钟
Q3:如何保证消息顺序?
Kafka 仅保证单 Partition 内顺序。若需全局顺序:
- 方案1:所有消息使用相同 Key(牺牲并行度);
- 方案2:按业务维度分 Topic(如
orders-user123)。
⚠️ 现实建议:接受“局部顺序”,设计幂等消费逻辑。
十二、总结 🎯
通过本文,你已掌握:
- ✅ 使用 Spring Boot 快速集成 Kafka;
- ✅ 实现可靠的消息生产(同步/异步/回调);
- ✅ 构建健壮的消费者(手动提交、批量消费、错误处理);
- ✅ 设计 DLQ 机制应对异常消息;
- ✅ 编写端到端测试保障质量;
- ✅ 避开生产环境常见陷阱。
Kafka + Spring Boot 的组合,让你既能享受 Kafka 的高性能,又能利用 Spring 的开发效率。消息驱动架构的大门,就此打开!
🌐 延伸学习:
现在,就去你的项目中实践吧!🚀
🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨
更多推荐



所有评论(0)