在之前的文章里kafka实现只消费消息一次 - broker篇,对broker进行了可靠性配置。但是如果不对生产者进行可靠性配置,消息还是存在丢失的可能。

发送消息的主要步奏

JsFL2F.jpg

  1. 创建消息ProducerRecord对象,对象包含目标主题Topic和消息的内容Value,还可以指定键Key或者分区Partition。
  2. 生产者使用序列化器将ProducerRecord对象里的键和值对象序列化成字节数组。
  3. 然后消息被传给分区器,如果之前ProducerRecord对象指定了分区Partition,那么分区器直接将消息发到对应的分区。如果没有指定分区,默认的分区器会根据ProducerRecord对象的键Key,对键进行散列,根据散列的值为消息选择一个分区(所以拥有相同的键的消息会被写到同一个分区)。如果键为null,默认的分区器会使用轮询算法将消息均匀分布到各个分区。除了默认的分区器,我们可以实现自定义的分区策略
  4. 消息被加入到批次中,这个批次里的所有消息会被发送到相同的主题和消息。
  5. Broker收到消息后,会返回一个响应。
    成功写入消息,会返回一个RecordMetaData对象,包含主题和分区,以及消息在分区里的偏移量信息。
    失败会返回一个错误,生产者可以根据错误的类型,选择重试还是直接抛出异常。

生产者的三种确认模式

  1. ack = 0
    如果生成者能够通过网络把消息发送出去,就认为消息已经成功写入到Kafka(不需要等待响应)。
  2. ack = 1
    只要分区首领收到消息并把它写入到分区数据文件(不一定同步到磁盘中),就会返回成功或错误响应。
    如果生产者得到成功响应后,说明消息已经成功写入到分区首领。
    对于错误的响应,如正在发生正常的首领选举,生产者也可以重发消息,最终消息也会安全到达新的首领。
  3. ack = all
    首领分区在返回成功或错误响应时,会等待所有同步副本收到消息。
    如果生成者收到成功的响应,说明消息已经成功写到到所有同步副本中。
    如果生产者收到错误的响应,生成值可以重试,知道消息被成功写入到所有同步副本
    ack = all, 配合最小同步副本参数min.insync.replicas,可以确保消息被写入不少于min.insync.replicas个的分区。

    ack = 0 发送消息最快,ack = all 发送消息最慢。

丢失消息的场景

下面的场景都是分区复制系数=3,禁用了不完全首领选举。

  1. ack = 0,这种确认模式是一定会丢失一些消息的。只要分区或集群不可用,或者分区在进行首领选举,消息就会丢失。
  2. ack = 1,当消息被写入到首领分区后,生成者就得到成功的响应,但是如果此时首领崩溃了,但消息还没有被其他的副本复制过去,而其他的副本此时还是被认为是同步副本(replica.lag.time.max.ms参数决定)。进行完全的首领选举后,其中一个副本成为新的首领,这种场景下,刚才那条消息就丢失了。
  3. 即时是ack = all,生产者也需要正确地处理错误,错误一般被分为两类,一类是可重试错误,可以通过重发消息来解决,比如分区正在进行首领选举(我们使用的生产者客户端一般都内置了重试机制,重试次数可以通过retries来配置);一类是不可重试错误,无法通过重发消息来解决,比如“消息太大”。

重试带来的问题

生产者客户端内置的重试机制,可以处理大部分的错误,不丢失数据。
但是重试也会导致消息的重复,生产者因为网络问题没有收到broker的确认,但实际上消息已经写入成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写入成功)。在这种情况下,broker会收到两个相同的消息。

  • 重试和恰当的错误处理可以保证消息至少被保存一次
  • Kafka无法保证每个消息只被保存一次
  • 消息中加入唯一标识符,来检测重复消息,如账单的流水号;或者使用使用"幂等消息"(多次消费结果都一样的消息,例如"这个账号只有100元"的消息。

代码

使用go来编写生产者发送消息

注意:confluent-kafka-go只是librdkafka的go语言封装,所以需要先安装librdkafka。安装教程

package msq

import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "log"
    "os"
)

const broker = "0.0.0.0:9093"
const InfoHashTopic = "info-hash"
var Producer *kafka.Producer = nil

func init() {
    //https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    var err error
    Producer, err = kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": broker,
        "retries": 10,    // 重试次数
        "acks": -1,    // 确认模式,ack=all
        //"partitioner": "consistent_random"  // 默认分区器
    })

    if err != nil {
        log.Printf("Failed to create producer: %s\n", err)
        os.Exit(1)
    }
    log.Printf("%v", Producer)
}

func Send(topic, msg string) {
    deliveryChan := make(chan kafka.Event)
    Producer.Produce(&kafka.Message{
        //TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        TopicPartition: kafka.TopicPartition{Topic: &topic},
        Value:          []byte(msg),
        Key:             []byte(msg),
    }, deliveryChan)
        
        // 下面的代码可以放到一个协程中处理,做到异步发送
    e := <-deliveryChan
    m := e.(*kafka.Message)

    if m.TopicPartition.Error != nil {
        log.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
    } else {
        log.Printf("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    close(deliveryChan)
}
Last modification:March 22nd, 2021 at 05:59 pm
如果觉得我的文章对你有用,请尽情赞赏 🐶