最近在写种子爬虫时,磁力链抓取的速度,远快于磁力链转种子的速度,于是,我使用Kafka来解耦抓取和转换。同时,我不想丢失任何一个磁力链(里面可能有我喜欢的小姐姐 ),也不想重复将同一个磁力链转换成种子(转换过程速度有点慢 )。于是参考了各种资料,现在总结下如何使用kafka实现只消费消息一次。

Kafka的架构

JEtKwq.png

在整个架构里,有三种角色,生产者(Producer),Kafka集群(Kafka Cluster),消费者(Cusumer)。

生成者创建消息,消费者读取消息。Kafka集群作为生产者和消费者之间的桥梁,接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存,同时,为消费者提供服务,对读取消息的请求作出相应,返回已经提交到磁盘里的消息。

要做到消息不丢失,不被重复消费,只被消费一次,需要这三者的共同配合。

Kafka集群

集群由多个独立的Kafka服务器组成,每个Kafka服务器被称为Broker。如果整个集群只有一个Broker,那么就存在了单点问题,Broker所在的服务器发生崩溃后,整个Kafka集群就不可用和丢失消息了。

下图是一个由两个Broker组成的Kafka集群。

JENVN6.jpg

Kafka使用主题来组织数据,每个主题又可以被分成若干个分区,每个分区可以有多个副本。上图里,主题A有两个分区,分区0和分区1,同时每个分区都有一个跟随者副本,分布在和首领副本不同的Broker上。这样,当Broker1崩溃后,Kafka仍可以保证可用性和消息的持久性。

首领副本
    每个分区都有一个首领副本。所有生产者请求和消费者请求都会经过这个副本。
    
跟随者副本
    首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们的唯一任务就是从首领那里复制消息,保持自己的消息与首领一致。
    为了跟首领副本保持一致,跟随者副本会向首领副本发送获取消息的请求,这种请求与消费者为了读取消息而发送的请求是一样的。

同步副本
    由于各种原因,例如网络拥塞,跟随者副本消息的最新偏移量(消息在分区里的位置)会落后于首领副本的最新偏移量,也就是跟随者副本里没有跟首领副本保持一致,没有包含全部的消息。这样的副本是不同步副本。
    相反,持续请求得到首领副本最新消息的副本,被称为同步副本。首领发生崩溃,只有同步副本才会被提升为新的首领。

了解了以上的概念,我们知道,为了保证Kafka的可靠性,不丢失消息,多Broker,分区多副本,同时确保同步副本不是1,是可行的方法。下面来了解下如何配置。

配置

以下的配置参数可以应用于broker级别,控制所有主题的行为。也可以应用于主题级别,用于控制个别主题的行为。

复制系数
    主题级别:replication.factor
    broker级别:default.replication.factor
    
    复制系数决定了每个分区的副本数,默认是1(我用的kafka版本是2.4.1)。
    (PS: 《Kafka权威指南》里,写着默认的复制系数是3,但是在搭建集群成功后,我发现用来保存消费者群组偏移量的内部主题__consumer_offsets,每个分区只有一个副本,查阅了官方文档,2.4.1的默认复制系数是1)
    (PS:实践后证明,内部主题__consumer_offsets的分区数和每个分区的副本数不是由num.partitions和default.replication.factor控制,而是由offsets.topic.num.partitions和offsets.topic.replication.factor控制)
    复制系数的配置,需要在可用性和存储硬件之间做出权衡。更高的复制系数会带来更高的可用性和可靠性,但同时也占用更多的磁盘空间。

Jd9QMT.jpg

Jd8SxK.jpg

看完这个,小姐姐突然不香了,因为我只有一台腾讯云主机。如果你是土豪,你可以购买3台服务器部署Kafka组成集群,使用复制系数3,Kafka会将分区的3个副本分布到不同的broker。

我决定在我的腾讯云主机上,部署三个kafka的docker容器,来组成一个kafka集群,这样一个容器挂了后,集群还可用。当然主机崩溃的情况,就没办法了。

不完全的首领选举
    主题级别:不可配置
    broker级别:unclean.leader.election(默认是false)

    默认情况下,当分区首领不可用时,一个同步副本会被选举为新首领。
    但是,如果首领不可用时,其他的副本都是不同步的,这时候就要做出权衡选择。
    
    - 如果不同步的副本不能被选举为新首领,那么整个分区在旧首领恢复之前,都是不可用的。
    - 如果不同步的副本可以被选举为新首领,那么在这个副本成为不同步之后,写入到旧首领的消息会全部丢失,导致数据不一致。

    简而言之,就是要在可用性和可靠性之间做出选择。
   
    对于数据质量和数据一致性要求较高的系统会禁用不完全的首领选举。银行这种设计到金钱交易的系统就是最好的例子。

为了不丢失一个“小姐姐”,我决定禁用掉不完全的首领选举。

但是,即时是同步副本,和首领副本也不是完全同步的,同步副本成为不同步副本之前的时间是通过replica.lag.time.max.ms来配置的,这就会存在一个时间差,消息写入到首领副本后,这段时间内,同步副本还没有复制到消息,但仍被认为是同步副本。

replica.lag.time.max.ms

Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from the replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.

如果在这段时间里,首领副本所在的机器崩溃了,消息还没被复制到同步副本,但是选举还是正常进行,其中一个同步副本会成为新的首领副本,这种情况下,消息就丢失了。

所以就需要最小同步副本和和对生产者进行可靠性的配置,先了解最小同步副本,生产者的配置下篇再讲下。

最小同步副本
    主题级别:min.insync.replicas
    broker级别:min.insync.replicas

    这个参数和生产者相关,生产者写消息时,只有在消息被写入到所有同步副本之后才会被认为是已提交的。
    最小同步副本配置,决定了生产者的消息,要被写入到多少个同步副本后,才算是已提交。
    在一个复制系数是3的分区,如果最小同步副本配置是1,那么当消息被写入首领副本后,就会被认为是已提交的了。但如果此时其他副本还没同步这个消息,首领副本就崩溃了,那么这个消息就丢失了。
    所以最小同步副本可以配置为2,这样只要写入到两个同步副本,就会被认为已提交。只有在两个副本不同步后,生成者发送消息时会接收到NotEnoughReplicasException异常,这时候就需要生产者来决定如何处理,可以尝试重发消息,知道发送成功。

为了确保不丢失“小姐姐”,在复制系数是3的情况下,最小同步副本数配置为2。

部署

因为Kafka使用Zookeeper来保存Broker的元数据,所以同时部署一个Zookeeper集群。
ubuntu使用docker-compose搭建zookeeper集群
Ubuntu使用docker compose部署kafka集群

部署成功后,可以执行docker exec -it kafka1 /bin/bash进入到容器里,通过命令行创建一个分区数是3,分区系数是3,不允许不完全首领选举,最少同步副本数是2的主题。

./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 3 --partitions 3 min-insync-replicas 2 --topic test

Jay9c4.jpg

总结

这一篇文章,主要是从broker角度出发,确保不会因为服务器崩溃后,导致消息丢失。后面再分别从生产者和消费者来确保消息不丢失和不重复消费。

参考资料

《kafka权威指南》

Last modification:April 23rd, 2020 at 09:00 pm
如果觉得我的文章对你有用,请尽情赞赏 🐶