kafka的存储单元是分区
分区是有序(按偏移量排序),不可变的消息序列。写入的消息只能被追加到分区。
每个分区被分成若干个片段
因为在一个大文件里查找和删除消息是很费事的,分区在磁盘(目录通过broker级别配置log.dirs
指定)里被分成若干个片段文件。
当生产者向kafka某分区写入消息时,kafka把消息写入到分区的活跃片段
。如果达到片段大小上限
,就会关闭当前文件,打开一个新的文件,成为新的活跃片段
。
(片段大小上限由主题级别配置log.segment.bytes
决定,默认是1GB)
片段的文件名格式是<base-offset>.log
。base-offset是该片段文件的起始偏移量的值。
在磁盘里,每个分区都有一个目录,目录里除了保存着分区的日志片段文件,还保存着日志片段文件对应的索引文件(时间索引
和偏移量索引
)。
上图里,info-hash是一个3分区的主题,可以看到每个分区都有一个对应目录info-hash-<数字>,目录里有对应的日志片段文件和索引文件。
索引文件帮助我们更快地定位到指定偏移量或者指定时间戳的消息在日志文件里的位置。
分区保留策略
在默认的保留策略(cleanup.policy
配置指定,默认是delete
),delete
策略在到达时间上限或者大小上限时,会删除旧文件,释放磁盘空间。
kafka根据主题级别配置retention.ms
来决定消息数据保留的时间,默认是1周。
另一种方式是通过保留的消息字节数来判断消息是否过期,它的值通过主题级别配置retention.bytes
来指定,作用在每一个分区
,默认是-1,不限制大小。
注意:活跃片段永远不会被删除,因为它的最后修改时间一直在被更新。
如果一个主题每天只接收100MB的消息,而log.segment.bytes使用默认设置,那么需要10天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果log.retention.ms被设为 604800000(也就是1周),那么日志片段最多需要17天才会过期。
这是因为关闭日志片段需要10天的时间,而根据配置的过期时间,还需要再保留7天时间(要等到日志片段里的最后一个消息过期才能被删除),日志片段文件的最后修改时间
再过去7天后才能删除整个。 -- 《kafka权威指南》
实际操作
retention.ms
下面来实际操作下,观察下数据的保留策略是如何运行。设置主题的segment.bytes=1024
, retention.ms=240000ms(4分钟)
。
(注意:刚开始我将retention.ms
设置为120000ms(2分钟),但是2分钟后,非活跃的片段文件没有被删除,而是还要等待一段时间后才被删除,原因是因为每个broker的清理线程,执行间隔默认是300000ms(5分钟))
创建主题
./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 3 --partitions 1 --topic test-clean --config retention.ms=240000 --config segment.bytes=1024
发送消息
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test-clean
查看目录,因为主题只有一个分区,所以只有一个目录test-clean-0
可以看到每个日志片段文件大小在1024 byte左右,到达分区片段大小限制时,旧的日志片段文件被关闭,新的日志片段文件被创建,消息追加到新的日志片段文件。
等待几分钟后,清理线程启动,会将当前时间和片段文件的最后修改时间
的差,和retention.ms
比较,决定是否删除文件。
ps:在操作的过程中,我留意到偏移量索引文件的大小都是0,查阅了资料,原来偏移量索引是稀疏索引
,它不会为每一个消息建立索引,而是每隔一定字节的数据建立一条索引,这样可以减小索引文件的大小。
retention.bytes
创建主题,将retention.bytes=10240
,retention.ms=-1
(没有时间的限制)
./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 3 --partitions 1 --topic test-clean-two --config retention.ms=-1 --config segment.bytes=1024 --config retention.bytes=10240
发送消息
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test-clean-two
不断发送消息后,可以看到目录里已经有10多个日志片段文件,整个分区的大小已经超过retention.bytes
。
等待几分钟后,清理线程启动,会将最早的几个日志片段文件删除。
retention.bytes和retention.ms
如果同时指定了retention.bytes和retention.ms,只要任意一个条件得到满足,消息就会被删除。
参考资料
How Kafka’s Storage Internals Work
《Kafka权威指南》