对broker和生产者进行可靠性配置后,最后就剩下对消费者进行可靠性配置了,这样整个消息队列系统就可以确保实现只消费一次消息了。
先了解消费者相关的一些概念。
消费者和消费者组
kafka使用消费者组
来将多个消费者
组织在一起,一个组内的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
注意:如果我们往群组里添加更多的消费者,超过主题的分区数量
,那么有一部分消费者就会被闲置,不会接收到任何消息。
使用这样的方式,使得kafka消费者在执行高延迟操作
,单个消费者无法跟上生产者发送消息的速度时,可以通过为主题创建更多的分区,并在同一个消费者群组里增加更多的消费者,来横向扩展消费能力。
同时,订阅同一主题的不同消费者组,不同的消费者组互不干扰,可以分别获取到主题的所有消息
。
注意:对于同一个消费者组,主题的某一个分区,只能被组里的某一个消费者独自读取。
分区再均衡
场景
分区所有权(分区由组里的哪个消费者读取)发生转移的场景:
- 新的消费者加入消费者组,它读取的分区是原本由组内其他消费者读取的分区。
- 消费者关闭或发生崩溃,离开消费者组,原本由它读取的分区将由其他消费者来读取。
- 主题发生变化,管理员添加新的分区或删除旧的分区等。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡
。
好处和坏处
再均衡
为消费者组带来了高可用
和伸缩性
,但同时也带来了一些不好的影响。
- 再均衡期间,整个组内的消费者都无法读取消息,整个组会有小段时间的不可用。
- 消费者当前读取到的消息在分区中里的位置(偏移量)信息可能会丢失,造成重复消费消息。
具体实现
- 消费者通过向被指派为
群组协调器
的 broker(不同的群组可以有不同的协调器)发送心跳
,来维持它们和群组的从属关系以及它们对分区的所有权关系。每个消费者都有一个独立的
心跳线程
来发送心跳
,发送间隔由heartbeat.interval.ms
控制。session.timeout.ms
指定了群组协调器
多久没有收到消费者
的心跳
,会判定消费者死亡,然后触发再均衡,把死亡消费者之前负责的分区分配给其他消费者。heartbeat.interval.ms
一定要比session.timeout.ms
小,一般是后者的1/3。 - 当消费者要加入群组时,它会向
群组协调器
发送一个JoinGroup
请求。第一个加入群组的消费者将成为“群主”
。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。分配完毕后,群组把分配情况发给群组协调器,协调器再把这些信息发送给所有消费者。
提交和偏移量
提交和偏移量是确保不丢失消息
和不重复消费消息
的关键。
偏移量是什么:
偏移量是一种元数据,它是一个`不断递增`的整数值,在创建消息时,kafka会把它添加到消息里。在给定的分区里,每个消息的偏移量都是`唯一`的。
分区里的消息是按照偏移量递增的顺序排序的。
消费者可以把自己对分区当前的读取位置,通过发送位移消息
的方法,保存到kafka内部主题_customer_offset(位移主题
)里。这种操作叫做提交
。
(默认情况,kafka会创建分区数是50,副本数是3的_customer_offset主题)
位移主题其实就是一个普通的kafka主题,但它的消息格式是kafka自己定义的。位移消息的格式是一个KV对。
位移消息的key保存了3部分内容<Group ID,主题名,分区号>
,value保存着最后一次提交的偏移量
等信息。
(key决定了位移消息,会被存储到位移主题的那个分区)
这样的设计,在消费者发生崩溃或者有新的消费者加入群组,完成再均衡后,每个消费者可能会分配到新的分区。消费者可以<Group ID,主题名,新的分区号>
,去读取__customer_offset主题对应的分区,获取到对应的位移消息,得到新分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
所以正确地提交偏移量就很重要了。
重复处理和丢失
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
提交方式
为了不重复消费和丢失消息,正确提交偏移量很重要。
自动提交
使用confluent-kafka-go来编写一个消费者程序。
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
confluent-kafka-go
从消费者读取一个批次的消息到本地队列里,但应用调用ReadMessage
,每次只从队列里读取一个消息。官方开发者认为这是最好的流处理方法。
No such configuration property: "max.poll.records"
confluent-kafka-go
使用本地内存offset store
来保存下一次要提交的偏移量。
配置enable.auto.offset.store
,默认是true
,应用每次调用ReadMessage
,会自动将消息的偏移量存储到offset store
。
最后的提交偏移量到位移主题,还是由自动提交或者调用commit()
来触发。
enable.auto.commit
- 自动和定期地在后台提交偏移量到位移主题。默认true
。
auto.commit.interval.ms
- 后台提交偏移量到位移主题的时间间隔。默认5000ms
。
Update documentation (auto commit issue) #1829
之前看《kafka权威指南》,官方的java消费者,使用自动提交,在发生再均衡时,后续可能会出现重复消费消息。
假设我们仍然使用默认的5s提交时间间隔,在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这 3s内到达的消息会被重复处理。
confluent-kafka-go
的自动提交,不仅会定期提交,在再均衡时和关闭也会触发提交。
手动提交
当然,可以设置enable.auto.commit=false
,关闭自动提交,使用commit()
来手动提交偏移量。
(commit()是一个同步的方法,查阅了文档,confluent-kafka-go
里没有异步的提交偏移量方法,官方开发者推荐使用go协议调用commit()方法来实现异步提交)
consumer: Async offset commits #64
总结
个人觉得,使用confluent-kafka-go
编写消费者程序,比较好的方法是,保留enable.auto.commit=true
,因为自动提交不仅会定期地提交偏移量,在再均衡和关闭的情况,也能帮我们提交偏移量,这一点还是很方便的,这样我们就不需要自己实现再均衡监听器
,不需要在再均衡开始之前和消费者停止读取消息之后,编写提交偏移量的代码。
同时可以设置enable.auto.offset.store=false
,因为默认是每次调用ReadMessage
就会更新offset store
里要提交的偏移量,但对于耗时的操作(例如我的将磁力链转换成种子,一般都要1,2分钟),默认情况下,操作还没完成,偏移量已经提交了。如果这时候发生再均衡或者消费者崩溃重启了,这条消息就是丢失了。
设置enable.auto.offset.store=false
,操作完成后,调用offset_store()
来更新offset store
要提交的偏移量。
继续提升消费者的可靠性
通过使用confluent-kafka-go
,使用自动提交enable.auto.commit=true
,关闭enable.auto.offset.store=false
,在处理完消息后而不是读取到消息时更新offset store
,已经可以确保消费者不会丢失消息,也不会重复消息消息。
但消费者在处理消息时,有一些情况,还需要我们考虑如何来处理
- 长时间处理
有时候处理数据需要很长的时间,你可以会从发生阻塞的外部系统获取信息(我的磁力链转种子就是这种请求),或者把数据写到外部系统,或者进行一个非常复杂的计算。虽然后台的心跳线程,会保持发送心跳,这样就不会触发再均衡。但是还有一个配置
max.poll.interval.ms
(上一张图),当两次调用ReadMessage
的时间间隔大于max.poll.interval.ms
时,消费者会被认为已崩溃,就会触发再均衡。我们可以使用线程池来处理,这样可以并行处理,加快处理速度,把数据交给线程池去处理之后,可以调用
Pause
方法来暂停消费者,然后保持调用ReadMessage
,工作线程处理完成后,调用Resume
方法恢复正常获取消息。go一般都是使用协程,搞个协程池?
Golang 开发需要协程池吗?
我想到的是限制并发数
Limit the maximum number of goroutines running at the same time
但是顺序性无法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。这就要好好设计下,之前公司的大佬,提出了一个方案,对每一个分区,自己维护一个偏移量的bitmap数据结构,任务完成了,对应的位置为1。同时开启一个协程,定期去扫描这个bitmap。从上一次提交的偏移量位置开始,扫描到第一个0的位置,前一个位置的偏移量就是我们可以提交的偏移量。 - 在处理消息过程中,出现错误,需要重试
有时候,在进行轮询之后,有些消息不会被完全处理,你想稍后再来处理。例如,假设要把 Kafka 的数据写到数据库里,不过那个时候数据库不可用,于是你想稍后重试。解决方法:
1,在遇到可重试错误时,提交最后一个处理成功的偏移量,然后把还没有处理出错的消息保存到程序的变量里,还是调用Pause
方法来暂停消费者,保持调用ReadMessage
(这样就不会触发再均衡)。尝试重新处理出错的消息(go将处理交给协程),重试成功,或者达到上限决定放弃后,使用数据库或其他存储把错误记录下来(当然也可以丢失,但如果是涉及到金钱交易,记录下来,便于以后处理)。然后调用Resume
恢复正常的获取消息。2,在遇到可重试错误时,把错误写入一个独立的主题,然后继续。一个独立的消费者去负责从该主题获取消息,并进行重试。我比较偏向于使用这种方式。这样耦合性比较低,还可以增加额外的逻辑,对这些错误进行其他的操作,如记录出错原因等。
消息只消费一次
经过对生产者,broker和消费者进行了可靠性配置,我们知道kafka可以保证不丢失消息,但是生成者可能会将同一个消息,写入到broker两次(生产者重试带来的问题),同时消费者也可能会重复消费消息(偏移量提交带来的问题),即使在处理完消息后再提交偏移量,也有可能出现,刚好处理完消息,消费者就崩溃了,还没有提交偏移量的情况。
《kafka权威指南》里,有讲到利用mysql来存储偏移量,将处理结果插入数据库和保存偏移量这两个操作,在一个事务
里完成。感兴趣的可以去看下。
实现仅一次消费最简单且最常用的办法是把结果写到一个支持唯一键的系统里
,比如关系型数据库,在我的种子爬虫程序里,我可以使用种子的info-hash作为唯一键,每次消费处理前,都可以判断下这个info-hash是否已经处理过。
参考资料
《kafka权威指南》
《极客时间-揭开位移主题的神秘面纱》