在之前的文章里kafka实现只消费消息一次 - broker篇,对broker进行了可靠性配置。但是如果不对生产者进行可靠性配置,消息还是存在丢失的可能。
发送消息的主要步奏
- 创建消息ProducerRecord对象,对象包含目标主题Topic和消息的内容Value,还可以指定键Key或者分区Partition。
- 生产者使用序列化器将ProducerRecord对象里的键和值对象序列化成字节数组。
- 然后消息被传给分区器,如果之前ProducerRecord对象指定了分区Partition,那么分区器直接将消息发到对应的分区。如果没有指定分区,默认的分区器会根据ProducerRecord对象的键Key,对键进行散列,根据散列的值为消息选择一个分区(
所以拥有相同的键的消息会被写到同一个分区
)。如果键为null,默认的分区器会使用轮询算法
将消息均匀分布到各个分区。除了默认的分区器,我们可以实现自定义的分区策略
。 - 消息被加入到批次中,这个批次里的所有消息会被发送到相同的主题和消息。
- Broker收到消息后,会返回一个响应。
成功写入消息,会返回一个RecordMetaData对象,包含主题和分区,以及消息在分区里的偏移量信息。
失败会返回一个错误,生产者可以根据错误的类型,选择重试还是直接抛出异常。
生产者的三种确认模式
- ack = 0
如果生成者能够通过网络把消息发送出去,就认为消息已经成功写入到Kafka(不需要等待响应
)。 - ack = 1
只要分区首领收到消息并把它写入到分区数据文件(不一定同步到磁盘中),就会返回成功或错误响应。
如果生产者得到成功响应后,说明消息已经成功写入到分区首领。
对于错误的响应,如正在发生正常的首领选举,生产者也可以重发消息,最终消息也会安全到达新的首领。 - ack = all
首领分区在返回成功或错误响应时,会等待所有同步副本
收到消息。
如果生成者收到成功的响应,说明消息已经成功写到到所有同步副本
中。
如果生产者收到错误的响应,生成值可以重试,知道消息被成功写入到所有同步副本
。
ack = all, 配合最小同步副本参数min.insync.replicas
,可以确保消息被写入不少于min.insync.replicas
个的分区。ack = 0 发送消息最快,ack = all 发送消息最慢。
丢失消息的场景
下面的场景都是分区复制系数=3,禁用了不完全首领选举。
- ack = 0,这种确认模式是一定会丢失一些消息的。只要分区或集群不可用,或者分区在进行首领选举,消息就会丢失。
- ack = 1,当消息被写入到首领分区后,生成者就得到成功的响应,但是如果此时首领崩溃了,但消息还没有被其他的副本复制过去,而其他的副本此时还是被认为是同步副本(
replica.lag.time.max.ms
参数决定)。进行完全的首领选举后,其中一个副本成为新的首领,这种场景下,刚才那条消息就丢失了。 - 即时是ack = all,生产者也需要正确地处理错误,错误一般被分为两类,一类是
可重试错误
,可以通过重发消息来解决,比如分区正在进行首领选举(我们使用的生产者客户端一般都内置了重试机制,重试次数可以通过retries来配置
);一类是不可重试错误
,无法通过重发消息来解决,比如“消息太大”。
重试带来的问题
生产者客户端内置的重试机制,可以处理大部分的错误,不丢失数据。
但是重试也会导致消息的重复,生产者因为网络问题没有收到broker的确认,但实际上消息已经写入成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写入成功)。在这种情况下,broker会收到两个相同的消息。
- 重试和恰当的错误处理可以保证消息
至少被保存一次
- Kafka
无法
保证每个消息只被保存一次
。 - 消息中加入
唯一标识符
,来检测重复消息,如账单的流水号;或者使用使用"幂等消息"(多次消费结果都一样的消息,例如"这个账号只有100元"的消息。
代码
使用go来编写生产者发送消息
注意:confluent-kafka-go只是librdkafka的go语言封装,所以需要先安装librdkafka。安装教程
package msq
import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
)
const broker = "0.0.0.0:9093"
const InfoHashTopic = "info-hash"
var Producer *kafka.Producer = nil
func init() {
//https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
var err error
Producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"retries": 10, // 重试次数
"acks": -1, // 确认模式,ack=all
//"partitioner": "consistent_random" // 默认分区器
})
if err != nil {
log.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
log.Printf("%v", Producer)
}
func Send(topic, msg string) {
deliveryChan := make(chan kafka.Event)
Producer.Produce(&kafka.Message{
//TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
TopicPartition: kafka.TopicPartition{Topic: &topic},
Value: []byte(msg),
Key: []byte(msg),
}, deliveryChan)
// 下面的代码可以放到一个协程中处理,做到异步发送
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}