0%

Kafka快速上手

Kafka头图

Apache Kafka 是一款基于发布与订阅的消息队列系统,采用生产者、消费者模式,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的消息平台。


消息队列

为什么需要消息队列

消息队列的核心作用就是三点:

  1. 缓冲:消息的生产和消费速度很可能不一致,Kafka 可以削峰填谷,当业务高峰来临时将消息暂存在队列中,下游根据实际处理能力进行消费,并在业务低峰时处理完积压的数据;
  2. 解耦:允许独立地扩展或修改消息生产者和消费者两边的处理过程,只要确保它们遵守同样的接口约束;
  3. 异步:消息队列提供了异步处理机制,允许将消息放入队列中进行搁置,在需要时再进行处理。消息生产者只需把消息发送到队列,不用关心消息是如何被消费的。

消息中间件的设计理念

消息中间件简易模型

消息中间件都是基于生产者、消费者模型来设计的。在业务简单时,可以通过内存队列的方式,非常方便地实现这个生产者-消费者模型。

但随着业务的发展,就不能再通过内存队列的方式来实现这种模型了。将之前内存队列中的成员抽取出来,设计成一个个子系统单独部署,提供 API 给各个系统去调用,实现多个进程之间的生产者-消费者模型,就得到了消息队列中的各个组件。

消息中间件模型

Kafka的主要角色

Kafka基本架构

  • Kafka Cluster:Kafka 集群,由多台 Kafka 服务器组成,每台 Kafka 服务器被称为一个 broker;
  • Producer:生产者,负责生产数据;
  • Consumer:消费者,负责消费数据;
  • Topic:主题,为了区分消息用途而起的名称,是一类消息在逻辑上的集合,在生产和消费消息时需要指定 Topic。
  • Partition:分区,存储消息的真正实体,一个 Topic 一般包含多个 Partition。生产或消费消息时,实际上到该 Topic 的某个 Partition 中进行写入或读取。
  • Replica:复本,是针对 Partition 的数据冗余,分区数据会被拷贝若干份并分散到不同的服务器中存储,避免因为某服务器宕机导致数据丢失。

集群和broker

一个 Kafka 集群(Kafka Cluster)由多台被称为 broker 的 Kafka 服务器组成,每个 broker 配置有单独的 broker.id

CDH-Kafka配置-broker.id

broker.id 可以在 Kafka 的配置中找到,可以看到该 Kafka 集群由 cdh1/cdh2/cdh3 三台机器组成,对应的broker.id 为 42/41/40。

Broker 负责接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。Broker 也为消费者提供服务,对分区的读取请求作出响应,返回已经提交到磁盘上的消息。单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

Controller

为了集群的容灾和管理,Kafka 集群中还存在着一个被称为集群控制器(Controller)的角色。Controller 由集群中的一个 broker 担任,负责以下工作:

  1. 管理集群:包括 Broker 管理、Topic 管理、ISR 变更管理等等。其中 Broker 管理里面就有一项当某个 Broker 宕机以后该更新集群状态让剩余存活的 Broker 正确提供服务。
  2. 更新元数据:一旦集群 Controller 感知到了集群的状态变化,会把最新的元数据信息发送给还存活的 Broker,以更新整个集群的元数据。

借助 Zookeeper,Controller 的选举过程比较简单。Broker 在启动的时候会尝试去 zk 上注册一个 /controller 临时节点,写入自己的 brokerid 和时间戳——如果成功就说明自己选上了,如果失败则获取当前已经当选 Controller 的 brokerid。所以 Controller 通常就是最先启动的那一个 broker。

/opt/cloudera/parcels/CDH-6.1.1-1.cdh6.1.1.p0.875250/lib/zookeeper/bin
1
2
3
4
5
$ ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED)] get /controller
{"version":1,"brokerid":41,"timestamp":"1698878732484"}
...

Broker 在启动的时候如果 /controller 节点存在则会对该节点增加一个监听器,并且绑定对应的 handler;
一旦承担 Controller 角色的 broker 宕机了,对应的临时节点就会被删除,因此就会触发集群中剩余的所有 broker 的监听事件,再次执行选举动作,也就是竞争创建 /controller 节点;最终谁创建成功就成为新的 controller;最后新的 controller 会重新更新集群状态,集群继续稳定提供服务。整个 controller 切换过程的耗时,正常情况下在秒级以内。

主题、分区与复本

Kafka 的消息通过主题(Topic)进行分类,一个主题可以被分为若干个分区(Partition),一个分区又有若干个复本(Replica),这些分区和复本可以分布在不同的服务器上,Kafka 通过分区和复本机制实现了数据的伸缩性和冗余。

生产者和消费者

Kafka 的用户被分为两种基本类型:生产者消费者

生产者创建消息。一般情况下,一个消息会被发布到一个特定的主题(Topic)上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区(Partition)。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

消费者读取消息。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量(Offset)来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper(Kafka 0.9 版本之前)或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

多个消费者可以组成一个消费者组,消费者组下的消费者协同消费一个主题的所有分区,消费者组保证主题的每个分区只能被一个消费者使用。通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。

关于“每个分区只能被一个消费者使用”的理解,可以参考下面的例子:

消费者从主题读取消息

  1. 消费者少于或等于分区数:如上图所示的群组中,有两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。
  2. 消费者多于分区数:如果一个主题只有 3 个分区,但消费者组内包含 5 个消费者,这时会有 3 个消费者工作,各自消费一个分区,另外 2 个消费者空闲。

消息和批次

Kafka 的数据单元被称为消息。可以把消息看成是数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以对于 Kafka 来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。键也是一个字节数组,与消息一样,对于 Kafka 来说也没有特殊的含义。当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。第 3 章将详细介绍键的用法。

为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。

模式

使用一些额外的结构来定义消息内容,可以让人们更易于理解。根据应用程序的需求,消息模式(schema)有许多可用的选项:

  • JSON/XML:易用,而且可读性好。但是缺乏强类型处理能力,不同版本之间的兼容性也不是很好。
  • Avro:它最初是为 Hadoop 开发的一款序列化框架。Avro 提供了一种紧凑的序列化格式,模式和消息体是分开的,当模式发生变化时,不需要重新生成代码;它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。

数据格式的一致性对于 Kafka 来说很重要,它消除了消息读写操作之间的耦合性。如果读写操作紧密地耦合在一起,消息订阅者需要升级应用程序才能同时处理新旧两种数据格式。在消息订阅者升级了之后,消息发布者才能跟着升级,以便使用新的数据格式。

工作机制

消息的读写

消息以追加的方式写入分区,然后以先入先出的顺序读取。但要注意,由于一个主题一般包含多个分区,因此只能保证消息在单个分区内有顺序,无法在整个主题范围内保证消息的顺序。如下图所示的主题有 4 个分区,消息被追加写入每个分区的尾部。

包含多个主题的分区

使用 Kafka 提供的 kafka-topics 工具创建一个名为 test 的 Topic:

1
2
3
4
# 创建名为test的Topic,分区数量为3个,复本数量为2个
$ kafka-topics --zookeeper cdh1:2181 --create --topic test --partitions 3 --replication-factor 2
...
Created topic "test".

分区在服务器中就是一个存储数据的目录,通过 Kafka 配置中的 log.dirs 可以找到数据保存的位置:

CDH-Kafka配置-log.dirs

在 cdh1 服务器上找到该目录,看下文件构成:

cdh1
1
2
3
4
5
6
7
8
9
10
11
12
13
$ tree /var/local/kafka/data/
/var/local/kafka/data/
...
├── test-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   └── leader-epoch-checkpoint
└── test-2
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint

可以看到有两个以 test 开头的目录,里面有一些数据文件和索引文件:

  • log 文件:进行读写的数据文件;
  • index 文件:稀疏索引文件,通过消息的 offset 进行构建的稀疏索引;
  • timeindex:稀疏索引文件,通过消息的写入、创建时间进行构建的稀疏索引;
  • checkpoint:检查点文件。

消息写入分区时,实际上是将消息写入分区所在的目录中。日志又分成多个分片(Segment),每个分片由日志文件与索引文件组成。分片使用第一个消息的偏移量作为该分片的基准偏移量,同时也将其作为索引文件以及日志文件的文件名。

每个分片大小是有限的,由 Kafka 配置中的 log.segment.bytes 控制(默认 1 GB),当分片大小超过限制则会重新创建一个新的分片,消息只会写入最新的一个分片。

CDH-Kafka配置-log.segment.bytes

为什么 cdh1 服务器上会有两个以 test 开头但编号不同的目录呢?

我们可以使用 kafka-topics 还查看 Topic 的详细信息:

1
2
3
4
5
6
7
8
# 查看test这个Topic的详细信息
# Topic名称,分区数量,复本数量
# Topic名称,分区编号,Leader、复本、Isr的broker.id
$ kafka-topics --zookeeper cdh1:2181 --topic test --describe
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 40 Replicas: 40,42 Isr: 40,42
Topic: test Partition: 1 Leader: 41 Replicas: 41,40 Isr: 41,40
Topic: test Partition: 2 Leader: 42 Replicas: 42,41 Isr: 42,41

可以看到除了 Topic 和 Partition 这些信息外,还有 Leader、Replicas 和 Isr 这三个属性。

复本(Replica)的存在是为了避免单点问题,将一个分区的数据多拷贝几份,因此多个复本之间如何同步数据是一个问题。为解决该问题,设计了多个复本之间存在着主从关系:负责提供读写的复本为 Leader,其余的复本为 Follower。Follower 复本只需要从 Leader 复本那里拉取数据,正常情况下不需要提供读写。

Kafka中复本的主从关系

知道了这些就可以回答上面的问题:目录中的编号代表着分区编号,cdh1 的 broker.id 为 42,该服务器是 2 号分区的 Leader,同时还是 0 号分区的 Follower,保存着 0 号分区的复本,因此有 test-0test-2 两个目录。同样的,在 cdh2 服务器能找到 test-1test-2 这两个目录,进一步验证了该结论。

ISR机制

Kafka 中复本之间的数据同步策略既非完全同步,也非完全异步,而是一种特殊的 ISR 机制。ISR 的全称为 In-Sync Replicas(同步复本集),动态维护着和 Leader 保持了同步的所有复本的集合。一个分区所有复本的集合叫作 AR(Assigned Repllicas),而未能与 Leader 保持同步的复本集合叫作 OSR(Out-Sync Relipcas),AR = ISR + OR。

ISR 列表是动态伸缩的,当复本失效后及时踢出 ISR 列表,复本赶上进度之后重新加入到 ISR 列表中。

Kafka 的 ISR 机制允许生产者在生产消息时,根据自己的业务场景自行配置 acks 以达到想要的效果:

  • acks=0:不确认。生产者只管发送消息,不管 Leader 是否收到。这种设置下消息的可靠性几乎没有保障,但是有极大的吞吐量。
  • acks=1:写入 Leader 就算成功,这种设置既可以保障一定的可靠性,也具有不错的吞吐量。
  • acks=all:写入 ISR 中所有的复本才算成功,这种设置能提供较高的可靠性,但吞吐量就相对较低。

有了 ISR,我们可以知道哪些 Follower 与 Leader 保持着同步;可以为了保证高可靠在生产消息时设置写入了处于 ISR 中所有的复本才算成功;可以在进行 Leader 切换时选择 ISR 中的 Follower 成为新的 Leader。

ISR 机制的存在是 Kafka 为了平衡可靠性和性能,将决定权交给了使用者,让使用者通过参数来控制,到底要实现什么程度的高可靠与高性能。

ISR的动态伸缩过程

Kafka生产者客户端的整体结构

Kafka生产者客户端

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程:

  • 主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator)中。
  • Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

参考资料

Kafka专题 https://my.oschina.net/keepal?tab=newest&catalogId=7217403

  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Kafka/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!