前言

我记录分析问题的过程,并没有直指问题本身,着急的同学,可以直接看结论。

项目场景:

Spring boot v2.0.2 release + kakfa + JavaMail 

我们使用JavaMail群发邮件,有的时候邮件的数量很大,我们还要在生成的邮件中加入临时生成的图片,这使得一封邮件的发送时间在3~4s,一旦数据量大的时候,比如500封邮件,那么发送完整个邮件时间估算是33.33分钟。


问题描述

当线上使用这个功能的时候,出现了严重bug,有很多客户没有收到邮件,有很多客户重复收到了多封邮件,有的甚至多达12封邮件。

客户反馈过来,需要快速解决。

邮件重复的规律是这样的

2025-09-24 17:26:33 开始发送第一封邮件 一切正常

2025-09-24 17:31:59 出现重复邮件,后边邮件日志显示都是发送2次

2025-09-24 17:37:09 出现重复3封,之后都是重复3次

2025-09-24 17:42:14 出现重复4封,之后都是重复4次

2025-09-24 17:47:14 出现重复5封。。。

2025-09-24 17:52:19 出现重复6封。。。

我的kafka部分配置如下
kafka.consumer.auto.commit.interval=100
kafka.consumer.concurrency=3
max.poll.interval.ms没有配置,使用默认值5分钟
kafka消费者实现:
@KafkaListener(topics = Constant.TOPIC_XXX_XXX)
public void consumerProcess(ConsumerRecord<?, ?> record) {
        // 。。。业务逻辑。。。
}

业务过程

1.为了避免阻塞邮件批量提交,我向kafka中发送一条触发消息,来异步触发消费者执行这个任务。

2.@KafkaListener接收到触发消息,开始执行这个任务

日志中比较奇怪的点:

1.我在日志文件中,找到了问题相关的三条相同消息的消费记录,三条日志的时间跨度没有规律,但是能看出是3条不同线程在执行任务

2025-09-24 17:31:58.665  INFO 23 --- [ntainer#9-0-C-1] c.w.m.c.q.XXX : [xxx]接收kafka消息,主题:TOPIC_ECARD_DISTRIBUTE,消息:["EC-202509241725528587158211"]
2025-09-24 17:42:10.013  INFO 23 --- [ntainer#9-1-C-1] c.w.m.c.q.XXX : [xxx]接收kafka消息,主题:TOPIC_ECARD_DISTRIBUTE,消息:["EC-202509241725528587158211"]
2025-09-24 17:47:13.082  INFO 23 --- [ntainer#9-2-C-1] c.w.m.c.q.XXX : [xxx]接收kafka消息,主题:TOPIC_ECARD_DISTRIBUTE,消息:["EC-202509241725528587158211"]

2.比较奇怪的另一个点是,我去日志中查找对应的邮件发送记录,和数据库的日志发送记录并匹配不起来,例如:有的日志只有1次发送记录,但数据库发送记录是2条,有的3条发送记录,数据是6条记录


原因分析:

日志中的记录很难找出规律了,甚至于有点混乱。

于是我从数据库发送邮件记录来找规律,我相信敏感的同学已经发现了,之前的重复邮件出现的频率,基本在5分钟一次。也就是说,每5分钟,kafka消费者这边就会启动一个新的线程(或新的业务逻辑)来消费消息。

由此可以看出3点:1.kafka每5分钟在启动线程(或启动新的业务逻辑),2.一条消息在不断被重复消费3.消息offset并没有并提交

我想熟悉kafka的同学应该能意识到,我前边提到的三个kafka的配置项了。下面解释一下:

kafka.consumer.auto.commit.interval=100  // 使用的是自动提交,每100毫秒提交一次
kafka.consumer.concurrency=3 // spring-kafka的配置,实现是通过启动3个同组消费者来实现对不同分区消费,如果超过分区则空闲。总之就是同组中增加了消费者的数量,而不需要真的创建3个消费者实例
max.poll.interval.ms //没有配置,使用默认值5分钟,表示kafka的消费者5分钟没有poll的话,kafka会认为这个消费者失联,从而发生rebalance。

原因

是的,就如你所想的那样,重复消费是kafka的rebalance造成的。那么rebalance是什么原因引起的呢?

是我将业务处理过程放在了kafka的消费者处理消息的逻辑当中,业务逻辑处理时间为33分钟左右,这样max.poll.interval.ms=5分钟情况下,每5分钟kafka就会判断这个因业务逻辑执行时间长而没有poll的消费者失联,然后重新rebalance,分区重新分配给别的消费者,这样重启的业务逻辑就会重新搂数据(已发送的不包含),从而开始每5分钟递增一封的情况,直至将线上两个实例的6个消费者全部占掉(6个消费者由kafka.consumer.concurrency=3决定 ),造成了如下现象:

2025-09-24 17:26:33 开始发送第一封邮件 一切正常

2025-09-24 17:31:59 出现重复邮件,后边邮件日志显示都是发送2次

2025-09-24 17:37:09 出现重复3封,之后都是重复3次

2025-09-24 17:42:14 出现重复4封,之后都是重复4次

2025-09-24 17:47:14 出现重复5封。。。

2025-09-24 17:52:19 出现重复6封。。。

至于offset没有提交,那是因为poll被阻塞了,所以offset的提交操作没有执行,虽然offset的提交是异步的,但调用依然在poll的逻辑中,而poll和消息消费逻辑在同一个线程

PS:1.日志中出现混乱,是因为我们的两个线上实例同时向一个文件中写日志,经常造成日志被覆盖丢失的问题,这是一个优化的点。这样的话,日志已经参考价值不大了。2.很多客户没有收到邮件的原因,是重复发送邮件,导致客户的邮件服务器屏蔽了该邮件 3.出现12封的是有的客户本身有2封邮件,重复6次就是12封


解决方案:

1.将业务逻辑从消费者的消息消费逻辑中剥离出来(启用线程池)

2.对消费业务做幂等操作(兜底)

3.其实可以向kafka中发送500条消息,慢慢消费,这样时效性可能会差一点,因为可能会消息积压。

4.最重要的:跟客户道歉,安抚客户。


日志层面的rebalance证据

2025-09-24 17:59:13.969  WARN 23 --- [ntainer#9-1-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-11, groupId=consumer-tutorial] Synchronous auto-commit of offsets {TOPIC_ECARD_DISTRIBUTE-0=OffsetAndMetadata{offset=31, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2025-09-24 17:59:13.969  INFO 23 --- [ntainer#9-1-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-11, groupId=consumer-tutorial] Revoking previously assigned partitions [TOPIC_ECARD_DISTRIBUTE-0]
2025-09-24 17:59:13.969  INFO 23 --- [ntainer#9-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [TOPIC_ECARD_DISTRIBUTE-0]
2025-09-24 17:59:13.969  INFO 23 --- [ntainer#9-1-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-11, groupId=consumer-tutorial] (Re-)joining group
2025-09-24 17:59:13.970  INFO 23 --- [ntainer#9-1-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-11, groupId=consumer-tutorial] Marking the coordinator 123.123.123.123:9092 (id: 2147482645 rack: null) dead

2025-09-24 17:59:14.037  WARN 23 --- [ntainer#9-2-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-12, groupId=consumer-tutorial] Synchronous auto-commit of offsets {TOPIC_ECARD_DISTRIBUTE-0=OffsetAndMetadata{offset=31, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2025-09-24 17:59:14.037  INFO 23 --- [ntainer#9-2-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-12, groupId=consumer-tutorial] Revoking previously assigned partitions [TOPIC_ECARD_DISTRIBUTE-0]
2025-09-24 17:59:14.037  INFO 23 --- [ntainer#9-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [TOPIC_ECARD_DISTRIBUTE-0]
2025-09-24 17:59:14.037  INFO 23 --- [ntainer#9-2-C-1] o.a.k.c.c.i.AbstractCoordinator          : [Consumer clientId=consumer-12, groupId=consumer-tutorial] (Re-)joining group

2025-09-24 17:59:14.057  WARN 23 --- [ntainer#9-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-10, groupId=consumer-tutorial] Synchronous auto-commit of offsets {TOPIC_ECARD_DISTRIBUTE-0=OffsetAndMetadata{offset=31, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2025-09-24 17:59:14.057  INFO 23 --- [ntainer#9-0-C-1] o.a.k.c.c.i.ConsumerCoordinator          : [Consumer clientId=consumer-10, groupId=consumer-tutorial] Revoking previously assigned partitions [TOPIC_ECARD_DISTRIBUTE-0]

结语

1.别管你使用的是什么技术,该做好你的幂等,就做好你的幂等!你不一定驾驭的了你使用的技术,踩坑之前做好防护!

2.当事情发生的时候,如果整个过程的影响因素很多,例如网络,客户操作,客户邮件服务器的屏蔽,日志被同时写入的覆盖,等等。我们去假设各种情况是不可靠的,你的脑力模拟不了那么多,去还原过程,是一条捷径。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐