kafka教程2 - kafka存储结构

2019/05/10

kafka概述

本篇博文主要是对kafka文件存储系统进行介绍,通过本篇博文你将了解到:

  • kafka的存储结构
  • kafka查找过程
  • 面试点

1 kafka存储结构

1.1 涉及概念

  • topic 主题,特指kafka处理的消息源的不同分类
  • partition topic物理上的分组,一个topic可以分为多个partition
  • message 消息,是通信的基本单位

1.2 存储结构

1.2.1 topic结构

  • topic包含多个partition,
  • topic只是逻辑概念,不涉及到存储,partition才是物理概念
  • 同一topic的不同partition可能分布在不同机器上

1.2.2 partition结构

  • partition是一个文件夹,其中包含多个segment
  • 如果其中有n个segment,则共有 2 x n 个文件
  • 每个partition是一个有序的队列
  • partition中的每条消息都会分配一个有序的id,即offset

partition

1.2.3 segment结构

  • 由一对文件组成,一个索引文件,一个数据文件
  • 文件命名规则:
    • 第一个segment从0开始
    • 新建一个segment, 取当前partition中的最大offset数值,即上一个segment的最后一条消息的offset值,比如00000000000000345678.log文件中的第一条消息的offset为345679
    • 最大为64位的long,不足位数补0,如00000000000000345678.log
    • 索引文件后缀.index,日志数据文件后缀.log

segment

1.2.3.1 segment索引文件结构
  • 存储一系列的元数据,每条元数据就是一条索引
  • 元数据条数小于消息条数,是稀疏索引
  • 元数据(即索引)构成
    • 序号:索引所指向的message在数据日志文件中的位置,即文件游标,从0开始,方便直接指定游标打开数据文件
    • 相对offset:索引所指向的message在数据日志文件中的相对序号,即相对的offset,从1开始,该相对offset加上文件名当中的值就是该message在整个partition中的绝对offset了

如图,稀疏索引,以索引文件里元数据3,497为例

依次在数据文件里表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497(offset)。

segment_index

1.2.3.2 segment日志数据文件结构
  • 存储一系列的message

1.2.4 message结构

  • offset 偏移量,消息的唯一标识,通过offset能找到唯一消息,类型long 8bytes
  • MessageSize 消息长度,类型int32 4bytes
  • crc32校验码, 4bytes,校验message
  • magic, 表示本次发布kafka服务程序协议版本号 1byte
  • attributes 独立版本,标识压缩类型,编码类型 1byte
  • key length 4bytes 当key length=-1时,key字段可不写
  • key 可选
  • payload 实际消息内容

message特点

  • message是无状态的,即不会标识是否已被消费过
  • message不能同时被多个consumer来消费,可以等前一个消费完成,下一个继续消费。
  • consumer可以同时消费多条message

2 查找流程

前提是已经确定了partition,因为随机的:

注意:

  • 可以利用offset在partition中查找
  • 不能在整个topic中查找的,因为offset只保证在partition中唯一,有序

流程如下(以查找offset=368776的message为例),两个步骤:

  • 第一步查找segment file(二分查找)
当中00000000000000000000.index表示最開始的文件,起始偏移量(offset)为0.
第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.
第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1。其它兴许文件依次类推。
以起始偏移量命名并排序这些文件,仅仅要依据offset 二分查找文件列表,就能够高速定位到具体文件。

当offset=368776时定位到00000000000000368769.index|log
  • 第二步通过segment file查找message(顺序查找)
通过第一步定位到segment file,当offset=368776时。
依次定位到00000000000000368769.

index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log

顺序查找直到offset=368776为止。

3 面试点

  • 为什么有segment,而不是把partition直接设计成单个文件?

    方便消费后删除,可以节约空间,如果是单个文件,该文件由于会被不断写入,无法删除,则会无限增加。当需要清理时,则需要在保证写入的同时,清理该文件的前面已经过期的消息,效率十分低下。

  • 为什么有partition?

    方便水平扩展broker,如果不设计多个partition,那么当部署完成之时,topic就会被限定在一台机器上了,随着业务增加,最终会陷入瓶颈

  • 索引文件的作用

    使查找效率为O(1),即与文件大小无关,与查找的位置无关

  • Push vs. Pull

    push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

  • partition如何同步副本?

ISR (In-Sync Replicas),这个是指副本同步队列。 副本不宜过多,因为同步影响kafka性能。

所有的副本(replicas)统称为Assigned Replicas,即AR。

ISR是AR中的一个子集,由leader维护ISR列表,
follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),

任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。

传送门

  • 数据可靠性和持久性保证
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。

0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。

-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。

如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效。

min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。
如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:
org.apache.kafka.common.errors .NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。


Show Disqus Comments

Post Directory