kafka教程1 - kafka概述

2019/05/08

kafka概述

本篇博文主要是对kafka概念以及应用进行介绍,通过本篇博文你将了解到:

  • kafka的结构
  • 发布-订阅的实现
  • kafka进阶

代码demo:github


1 kafka概述

1.1 kafka的概念和用途

简单来说,kafka就是一个中间件,两个服务A/B之间用于传输数据的工具。

那么可以理解为kafka就是一个消息系统,就像服务之间传递数据的一个管道。

对比消息队列 和 kafka

消息系统负责将数据从一个应用程序传输到另一个应用程序,因此业务服务可以专注于数据,不必担心如何共享数据。

两种消息系统模式

  • 点对点
  • 发布-订阅(pub-sub)消息系统, 大多数消息模式遵循 pub-sub 。

1 消息队列(点对点消息系统):

点对点,即消息队列中的一条数据只能消费一次(出队操作),一旦消费者读取队列中的消息,它就从该队列中消失。

所以,消息队列是一个可以多服务写入,多服务消费,实时的消息结构实现。

message_queue

2 pub-sub消息系统

如图,在发布-订阅系统中,消息生产者称为发布者,消息使用者称为订阅者, 消息系统是一个中间件。

publish_subscribe_messaging_system

综上,kafka较之消息队列,实现一条数据供多个消费者消费,并可既可以实时消费又允许延迟消费,分布式数据支持高吞吐。

1.2 kafka结构以及组件术语

kafka_struct

如图,kafka的四个主要组成部分是producer、consumer、topic、broker;

主要结构 说明
producer kafka数据(消息)提供生产方
consumer 消费kafka数据(消息)
topic 消息类别,一个topic是一组消息
broker kafka 集群中包含的每一台服务器

1.2.1 kafka消息传递流程

如上图,简言之,producer、consumer有多个,producer向多个topic中推送数据(push模式),consumer消费topic中数据(pull模式)。

pub-sub模式,consumer订阅了topic,说明使用了观察者模式,主动pull消息。

问题:

consumer消费完topic一条消息后,消息会删吗?

猜测:不会,kafka就指定每一条message的过期时间,到期删除;而消费之后,该consumer只会记录自己在该topic消费的最新的offset,不必删除消息。

1.2.2 其他术语定义

kafka角色 说明
consumer group 每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 的一个 Consumer 消费,但可以被多个 consumer group 消费。
partition topic的物理存储分配单元, 每个topic包含一个或多个partition。
replica partition 的副本,保障 partition 的高可用。
leader replica 中的一个角色, producer 和 consumer 只跟 leader 交互
follower replica 中的一个角色,从 leader 中复制数据。
controller kafka 集群中的其中一个服务器,用来进行 leader election(选举) 以及各种failover(容错/故障分析)。
zookeeper kafka 通过 zookeeper 来存储集群的 meta 信息。meta(元语言、元数据)
1、元数据:
  kafka中消息就是一个字节数组,对kafka而言他就是一个字节数据,而不考虑消息内容或者其意义,
         也就是每一条消息对于kafka而言没有特别的含义或者格式,只是一组字节而已。

2、broker:

broker是kafka集群中包含的一台服务器。

1) broker生产者服务:broker接收生产者的消息,为消息设置偏移量,将消息存入磁盘中(partition)。
   理解:offset、partition都是物理存储记录,即broker物理机器上的磁盘的位置标记;

2) broker消费者服务:broker接收消费者请求,broker响应读取请求,返回磁盘上的消息。

3、controller:

每个broker集群都有一个broker同时充当了controller的角色,
  即既是broker又是controller,是自动从集群的活跃成员中选举出来的。

controller负责管理工作,包括将partition分配给broker以及监控broker;
  分区复制提高容错,如果leader挂了,其他broker会接管leader。

4、kafka保留消息

kafka保留消息有两种类型: 保留一段时间 和 保留消息达到一定的大小。
每个topic可以设置自己的保留设置。
如果两个同时指定,满足其中一个就会删除。

1.2.3 kafka详细流程

为了形象,按照生产消费的逻辑顺序,结合上图producer1、topic1说明:

  1. producer1 将主题为topic1的一组message发布到kafka集群上,kafka汲取将消息数据分为part1、part2两部分;
  2. 如图,part1、part2都保存了三份,随机存放到三个partition下;
  3. controller对part1、part2数据所在的partition进行leader选举;
  4. kafka上新增消息,定义topic1的consumer group1/2收到新增通知;
  5. consumer group1/2 中个选出一个消费者对part1、part2所在的leader partition进行消费;consumer group 消费offset标记进行更新;

上述过程顺序不一定准确,只是为了对整体的消息数据流动以及数据状态变化作一介绍。

几点总结

  1. kafka是按批次(一组消息)写入的,并且批次数据会被压缩; 目的是为了减少数据传递的网络开销;
  2. Kafka 有四个核心的 API。客户端和服务器端的通信,是基于TCP 协议;
  3. 一个topic有多个partition(不是leader和follower,是多个leader partition), producer均衡写入,consumer消费时是均衡选取,所以partition之间数据消费是无序的
  4. 消费一个partition中的数据是有序的(队列),不过可以通过调整offset的值消费特定数据;
  5. consumer会把每个partition最后读取的消息偏移量保存在zookeeper或者kafka上,如果consumer关闭或重启,其读取状态也不会丢失;
  6. kafka使用zookeeper保存集群的元数据和消费者信息,0.9.00版本之后消费者的数据也能保存在broker上,减少对zk的依赖。(见图)
  7. 磁盘性能影响producer,内存影响consumer;因为consumer会将读取的消息放到系统的页面缓存中。
  8. 顺序消费的实现如下:
对于每一条消息数据,kafka提供了一个可选项,键, 对,就是hash结构k-v;其中,键也是元数据(字节数组)
此外,kafka存在一个分区器组件,用来确定消息分区;

如果一条消息存在键,那么分区器会计算键的散列值,并将其映射到指定partition;
这样,同样的key就会使得所有message写入到同一个partition中,从而实现顺序消费。

如果没有键存在,那么就均衡写入,均衡消费。

kafka_zookeeper

kafka_zookeeper2

kafka 在 zookeeper 中的存储结构如下图所示:

zk_content

1.3 kafka多集群

为什么使用多集群?

对着kafka部署数量的增加,可能要考虑以下因素

  • 数据类型分离
  • 安全需求隔离
  • 多数据中心(容灾恢复)

所以,需要实现多集群。

2 集群之间的数据同步

kafka MirrorMaker工具,原理MirrorMaker的核心组件是一个生产者和一个消费者,用来同步集群间的消息。

1.4 ruby API

1.4.1 producer

如图,key可选

producer

1.4.2 consumer

2 kafka Replication(副本)设计

kafka数据的可靠性以及高可用性能,主要依赖的是副本机制。

在上面已经介绍了,Kafka的replica副本单元是topic的partition,一个partition的replica数量不能超过broker的数量,

因为一个broker最多只会存储这个partition的一个副本。

所有消息生产、消费请求都是由partition的leader replica来处理,其他follower replica负责从leader复制数据进行备份。

Replica均匀分布到整个集群,Replica的算法如下:

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上
  3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

replica

如图,TopicA有三个partition:part0、part1、part2,

每个partition的replica数等于2(一个是leader,另一个是follower),按照以上算法会均匀落到三个broker上。

broker对replica管理:

选举出一个broker作为controller,由它Watch Zookeeper,负责partition的replica的集群分配,以及leader切换选举等流程。

2.1 In-Sync-Replica(ISR)

首先明确ISR列表的节点元素是broker。

kafka不是完全同步,也不是完全异步,是一种ISR机制:

  1. leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
  2. 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
  3. 当ISR中所有Replica都向Leader发送ACK时,leader才commit

即ISR列表中的kafka节点,必须是alive状态(failure/alive),即:

  • 节点必须和Zookeeper保持心跳连接;
  • 如果节点是follower,必须从leader节点上复制数据来备份,而且备份的数据相比leader而言,不能落后太多。

kafka将alive的replica节点认为是”in sync”(同步中),所以称为In-Sync-Replica(ISR)。

这里的节点是指broker

Kafka的Zookeeper维护了每个partition的ISR信息,理想情况下,ISR包含了partition的所有replica所在的broker节点信息,

而当某些节点不满足以上条件时,ISR可能只包含部分replica。

例如,上图中的TopicA-part0的ISR列表可能是[broker1,broker2,broker3],也可能是[broker1,broker3]和[broker1]。

2.2 数据可靠性

Kafka如何保证数据可靠性?首先看下,Producer生产一条消息,该消息被认为是”committed”(即broker认为消息已经可靠存储)的过程:

1) 消息所在partition的ISR replicas会定时异步从leader上批量复制数据log

2) 当所有ISR replica都返回ack,告诉leader该消息已经写log成功后,leader认为该消息committed,并告诉Producer生产成功。

这里和以上”alive”条件的第二点是不矛盾的,因为leader有超时机制,

leader等ISR的follower复制数据,如果一定时间不返回ack(可能数据复制进度落后太多),则leader将该follower replica从ISR中剔除。

3) 消息committed之后,Consumer才能消费到。

以下是同步复制和异步复制的定义,而kafka的committed过程并不相同。

  • 同步复制: 同步复制要求所有能工作的follower都复制完,这条消息才会被认为committed
  • 异步复制: follower异步的从leader复制数据,数据只要被leader写入log就被认为已经committed

ISR机制下的数据复制,既不是完全的同步复制,也不是单纯的异步复制,这是Kafka高吞吐很重要的机制。

我理解错了,先给出同步复制和异步复制的定义,我以为ISR复制机制就是这两种定义的结合呢,no!

这里只是对同步复制、异步复制、ISR复制三种方式做一对比而已;

ISR复制方式就是上面的1、2、3点.

ISR复制和异步复制不一致的地方在于,不会一直等待,超时剔除。

单独的同步复制会极大影响kafka吞吐量;异步复制方式如果follower都复制完都落后于leader,而如果leader突然宕机,则会丢失数据。

而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐量,follower可以批量的从leader复制数据,数据复制到内存即返回ack,

这样极大的提高复制性能,当然数据仍然是有丢失风险的。

Kafka本身定位于高性能的MQ,更多注重消息吞吐量,在此基础上结合ISR的机制去尽量保证消息的可靠性,但不是绝对可靠的。

isr1 isr2 isr3

2.3 服务可用性

什么是服务可用性,就是请求能响应,producer和consumer只跟leader交互,

所以讨论的问题是: 当leader故障后会怎样呢?如何选举新的leader?

leader选举:

  • Kafka在Zookeeper存储partition的ISR信息,并且能动态调整ISR列表的成员,只有ISR里的成员replica才会被选为leader,并且ISR所有的replica都有可能成为leader;
  • leader节点宕机后,Zookeeper能监控发现,并由broker的controller节点从ISR中选举出新的leader,并通知ISR内的所有broker节点。

因此,可以看出,只要ISR中至少有一个replica,Kafka就能保证服务的可用性(但不保证网络分区下的可用性)。

2.4 容灾和数据一致性

分布式系统的容灾能力,跟其本身针对数据一致性考虑所选择的算法有关,

例如,Zookeeper的Zab算法,raft算法等。Kafka的ISR机制和这些Majority Vote算法对比如下:

  • ISR机制能容忍更多的节点失败。假如replica节点有2f+1个,每个partition最多能容忍2f个失败,且不丢失消息数据;但相对Majority Vote选举算法,只能最多容忍f个失败。
  • 在消息committed持久化上,ISR需要等2f个节点返回ack,但Majority Vote只需等f+1个节点返回ack,且不依赖处理最慢的follower节点,因此Majority Vote有优势
  • ISR机制能节省更多replica节点数。例如,要保证f个节点可用,ISR方式至少要f个节点,而Majority Vote至少需要2f+1个节点。

如果所有replica都宕机了,有两种方式恢复服务:

  1. 等ISR任一节点恢复,并选举为leader;
  2. 选择第一个恢复的节点(不一定是ISR中的节点)为leader

第一种方式消息不会丢失(只能说这种方式最有可能不丢而已),第二种方式可能会丢消息,但能尽快恢复服务可用。

这是可用性和一致性场景的两种考虑,Kafka默认选择第二种,用户也可以自主配置。

大部分考虑CP的分布式系统(假设2f+1个节点),为了保证数据一致性,最多只能容忍f个节点的失败,而Kafka为了兼顾可用性,允许最多2f个节点失败,因此是无法保证数据强一致的。

error

如图所示,一开始ISR数量等于3,正常同步数据,红色部分开始,leader发现其他两个follower复制进度太慢或者其他原因(网络分区、节点故障等),
将其从ISR剔除后,leader单节点存储数据;然后,leader宕机,触发重新选举第二节点为leader,重新开始同步数据,
但红色部分的数据在新leader上是没有的;最后原leader节点恢复服务后,重新从新leader上复制数据,而红色部分的数据已经消费不到了。

因此,为了减少数据丢失的概率,可以设置Kafka的ISR最小replica数,低于该值后直接返回不可用,当然是以牺牲一定可用性和吞吐量为前提了。

2.5 重复消息

消息传输有三种方式:

  1. At most once:消息可能会丢失,但不会重复传输
  2. At least once:消息不会丢失,但可能重复传输
  3. Exactly once:消息保证会被传输一次且仅传输一次

Kafka实现了第二种方式,即,可能存在重复消息,需要业务自己保证消息幂等性处理。

2.6 高吞吐设计

  1. 对于partition,顺序读写磁盘数据,以时间复杂度O(1)方式提供消息持久化能力。
  2. Producer批量向broker写数据
  3. Consumer批量从broker拉数据
  4. 日志压缩
  5. Topic分多个partition,提高并发
  6. broker零拷贝(Zero Copy),使用sendfile系统调用,将数据直接从page cache发送到socket上
  7. Producer可配置是否等待消息committed。

    如果Producer生产消息,每次都必须等ISR存储后才返回,时延会很高,进而影响整体消息的吞吐量。

    为了解决这个问题,一方面Producer可以配置减少partition的副本数,例如,ISR大小为1;

    另一方面,在不太关注消息可靠存储的场景下,Producer可以通过配置选择是否等待消息committed,如下:

Acks 持久性等级 返回响应时机 写延时
all 全部ISR成员返回ACK
1 ISR中任一成员返回ACK
0 不等待ISR成员返回ACK

这是用户在消息吞吐量和持久化之间做的权衡选择,持久化等级越高,生产消息吞吐量越小,反之,持久化等级越低,吞吐量越高。

2.7 HA(高可用)基本原理

2.7.1 broker HA

broker集群信息由Zookeeper维护,并选举出一个controller。

所有partition的leader选举都由controller决定,将leader的变更直接通过rpc方式通知需要为此做出响应的brokers;

controller也负责增删topic以及partition replica的重新分配。

controller在Zookeeper上注册watch,

一旦有broker宕机,其对应在Zookeeper的临时节点自动被删除,controller对宕机broker上的所有partition重新分配新leader;

如果controller宕机,其他broker通过Zookeeper选举出新的controller,然后同样对宕机broker上的所有partition重新分配新leader。

2.7.2 partition HA

partition leader所在的broker宕机,如上所述,broker controller根据动态维护的ISR,会重新在剩下的broker机器中选出ISR里面的一个成员成为新的leader。

如果ISR中至少有一个follower,则可以确保已经committed的数据不丢失;否则选择任意一个replica作为leader,该场景可能会有潜在的数据丢失;

如果partition所有的replica都宕机了,就无法保证数据不丢失了,有两种恢复方案,上文已介绍过。

3 kafka性能调优

3.1 虚拟内存

对于高吞吐量的应用,要尽量避免内存交换,因为内存页和磁盘之间的交换会对kafka的性能有很大影响。

内存交换: 

内存交换(对换)的基本思想是,把处于等待状态(或在CPU调度原则下被剥夺运行权利) 的程序从内存移到辅存,把内存空间腾出来,这一过程又叫换出;把准备好竞争CPU运行的程序从辅存移到内存,这一过程又称为换入。

打个比方:你的内存(主仓库)放满了,就临时把内存(主仓库)暂时不用的东西放到硬盘里(副仓库),这样内存(主仓库)可以放新的东西。如果要用旧的东西再从硬盘里(副仓库)搬回来。

有关交换需要注意以下几个问题:
  1、交换需要备份存储,通常是快速磁盘。它必须足够大,并且提供对这些内存映像的直接访问。
  2、为了有效使用CPU,需要每个进程的执行时间比交换时间长,而影响交换时间的主要是转移时间。转移时间与所交换的内存空间成正比。
  3、如果换出进程,必须确保该进程是完全处于空闲状态。
  4、交换空间通常作为磁盘的一整块,且独立于文件系统,因此使用就可能很快。
  5、交换通常在有许多进程运行且内存空间吃紧时开始启动,而系统负荷降低就暂停。
  6、普通的交换使用不多,但交换策略的某些变种在许多系统中(如UNIX系统)仍发挥作用。

优化方案:

  • 尽量避免内存交换, vm.swappiness参数值尽量小(1,新版本Red Hat内核0是任何情况下都不交换)

    内存交换有容错的作用,比如内存不足进程会中断,所以完全拒绝内存交换。

    上文中的零拷贝

  • 优化脏页和磁盘数据同步,vm.dirty_background_ratio(脏页占系统内存百分比,5,达到5刷新) 和 vm.dirty_ration参数(60~80)

脏页:

因为硬盘的读写速度远赶不上内存的速度,系统就把读写比较频繁的数据事先放到内存中,以提高读写速度,这就叫高速缓存。
linux是以页作为高速缓存的单位,当进程修改了高速缓存里的数据时,该页就被内核标记为脏页。

内核将会在合适的时间把脏页的数据写到磁盘中去,以保持高速缓存中的数据和磁盘中的数据是一致的。

3.2 磁盘

1 磁盘选择(忽略)

2 一般日志片段存储在磁盘, 包括3个时间戳:创建时间(ctime) 最后修改时间(mtime) 最后访问时间(atime)

最后访问时间用处不大,可以忽略更新

日志片段

3.3 网络

1 调整socket读写缓冲区的大小

2 调整TCP socket缓冲区的大小


Show Disqus Comments

Post Directory