Kafka简介

分布式基于发布/订阅的消息系统

kafka


简介

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等

特点:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

基本架构

一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

名词定义

  • Broker(缓存代理):已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个Broker。
  • Topic:Kafka将消息分类,每一类的消息称之为一个主题。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
  • Partition(分区):topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Offset(偏移量):一个主题下的每个分区中的消息是顺序写入磁盘的,而偏移量是一个连续的用于定位被追加到分区的每一个消息的序列号,最大值为64位的long大小,它是唯一标记一条消息。
  • Producer(生产者):发布消息的对象称为生产者,负责发布消息到Kafka broker。
  • Consumer(消费者):订阅消息并处理发布的消息的对象称之为主题消费者,向Kafka broker读取消息的客户端。在kafka中,一个分区中的消息对于同一个消费组只有一个consumer在消费。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset。间歇性的向zookeeper(老版本)/coordinator(新版本)注册offset。
  • Consumer Group(消费组):既然是一个组,那么组内必然可以有多个消费者或消费者实例,它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题的所有分区。每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

消息存储策略

每个partition在存储层面是append log文件,任何发布到此partition的消息都会被直接追加到log文件的尾部。
kafka-log

  • kafka中支持消息持久化的,生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,持久化是保存在kafka的日志文件中。
  • Message在Broker中通过Log追加(即新的消息保存在文件的最后面,是有序的)的方式进行持久化存储。并进行分区(patitions)
  • 为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。

生产者交互

生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中
也可以通过指定均衡策略来将消息发送到不同的分区中
如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中
producer

  • 向 Kafka 的一个 topic 发布消息的过程叫做 produces
  • 可指定消息的partition:Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition(即:生产者可以指定将发送的消息放在一个topic中的partition1,还是partition2中)(注:这种机制可以理解为一种变相的负载均衡,轮转;比如基于”round-robin”方式或者通过其他的一些算法等)
  • 异步发送:kafka支持异步批量发送消息。批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。

消费者交互

在消费者消费消息时,kafka使用offset来记录当前消费的位置

在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。

对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息
consumer


Broker的无状态机制

  • Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。虽然broker没有副本,但是消息本身是有副本的,不会丢失。Broker只要在宕机后再读取消息的日志就行了
  • Broker不保存订阅者的状态,由订阅者自己保存。
  • 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
  • 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id,即偏移量)进行重新读取消费消息。

  • 消费者通过kafka中的“稀疏索引”快速的找到它没有消费的消息

  • Zookeeper会帮助记录哪条消息已经消费了,哪条消息没有消费。因此消费者通过zookeeper确定消息的状态。

Message的组成

  • Message消息:是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息
  • Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
  • partition中的每条Message包含了以下三个属性:
    offset(偏移量,即消息的唯一标示,通过它才能找到唯一的一条消息) 对应类型:long
    MessageSize 对应类型:int32
    data 是message的具体内容
  • 消息是无状态的,消息的消费先后顺序是没有关系的
  • 每一个partition只能由一个consumer来进行消费,但是一个consumer是可 以消费多个partition,是一对多的关系

Partition的分区目的

  • kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存
  • 可以将一个topic切分多任意多个partitions,来消息保存/消费的效率
  • 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力

Kafka文件存储机制

topic中partition存储分布

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名称分别为report_push、launch_info, partitions数量都为partitions=4
存储路径和目录规则为:
xxx/message-folder

1
2
3
4
5
6
7
8
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

partiton中文件存储方式

kafka-partition

  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
  • 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

partiton中segment文件存储结构

  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
  • segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据:
segment file
以上述图中一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下:
index
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。
其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
segment data file由许多message组成,下面详细说明message物理结构如下:
message

参数说明
关键字 说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。

partition中通过offset查找message过程

例如读取offset=368776的message,需要通过下面2个步骤查找。

  • 第一步查找segment file 上图为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
    当offset=368776时定位到00000000000000368769.index|log
  • 第二步通过segment file查找message
    通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

从上图可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

运行效果

Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:
写message

  • 消息从java堆转入page cache(即物理内存)。
  • 由异步线程刷盘,消息从page cache刷入磁盘。

读message

  • 消息直接从page cache转入socket发送出去。
  • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁
    盘Load消息到page cache,然后直接从socket发出去

存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

参考资料:

  1. kafka实战
  2. Kafka剖析(一):Kafka背景及架构介绍
  3. Kafka文件存储机制那些事
  4. Kafka学习(二):Kafka的基本结构和概念
如果文章对您有帮助,感谢您的赞助支持!