Kafka重复消费场景及解决方案

Dcr 1年前 ⋅ 1087 阅读

Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅消费者者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。生产环境中消费者在消费消息的时候若不考虑消费者的相关特性可能会出现重复消费的问题。

讨论重复消费之前先看看KafKa中跟消费者有关的几个配置参数.

enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset
auto.commit.interval.ms在enable.auto.commit为true的情况下,自动提交的间隔,默认值5000ms
max.poll.records单次消费者拉取的最大数据条数,默认值500
max.poll.interval.ms默认值5分钟,若5分钟内消费者没有消费完上一次poll的消息,那么consumer会主动发起离开group的请求

kafka重复消费问题分析:
导致重复消费问题原因在于,已经消费了的数据,偏移量没来得及提交

在配置自动提交enable.auto.commit默认值为true的情况下,出现重复消费的场景有以下几种:
1.Consumer在消费过程中,应用进程被强制kill掉或者发生异常退出
例如在一次poll500条消息后,消费到200条时,进程被强制kill消费导致offset 未提交,或出现异常退出导致消费到offset未提交。下次重启时,依然会重新拉取这500消息,这样就造成之前消费到200条消息重复消费了两次。因此在有消费者线程的应用中,应尽量避免使用kill -9这样强制杀进程的命令。
2.消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。若消费者消费的消息比较耗时,那么这种情况可能就会出现。

为了避免因重复消费导致的问题,以下提供了两种解决重复消费的思路。
第一种思路是提高消费能力,提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。

第二种思路是引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。

此外,在一些消费场景中,我们可以将消费的接口幂等处理,例如数据库的查 询操作天然具有幂等性,这时候可不用考虑重复消费的问题。对于例如新增数据的操作,可通过设置唯一键等方式以达到单次与多次操作对系统的影响相同,从而使接口具有幂等性。

全部评论: 0

    我有话说: