0%

Kafka快速上手

Kafka头图

Apache Kafka 是一个开源的分布式事件流平台,它是一款基于发布与订阅的消息队列系统,采用生产者、消费者模式,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的消息平台。


概述

Kafka 是一款基于发布与订阅的消息队列系统,采用生产者、消费者模式,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的消息平台。

为什么需要消息队列

消息队列系统的核心作用就是三点:缓冲、异构、解耦。以用户注册案例来说明消息队列的作用。用户注册的一般流程:

  1. 用户输入注册信息,等待结果。
  2. 系统进行合法性验证。
  3. 初始化账号信息。
  4. 其他账户配套系统,如:账号绑定、多账号数据合并、通讯录用户关联、新用户礼包发放。
  5. 通知用户注册结果。

这样就面临一个问题:用户注册流程包含多个步骤,注册高峰期每一步都可能需要等待一段时间,用户等待时间是所有步骤的累计时间(单线程)/ 耗时最长的那个步骤时间(多线程)。

我们需要减少用户的等待时间,解决方法就是使用消息队列:

  • 系统验证用户注册信息合法后,提示注册成功。生产者生产注册消息到消息平台。
  • 与注册流程相关的程序作为消费者来消费注册消息,完成对应的操作。

发布与订阅

数据(消息)的发送者(发布者)不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者(订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个 broker,也就是发布消息的中心点。

消息和批次

Kafka 的数据单元被称为消息。可以把消息看成是数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以对于 Kafka 来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。键也是一个字节数组,与消息一样,对于 Kafka 来说也没有特殊的含义。当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。第 3 章将详细介绍键的用法。

为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。

模式

使用一些额外的结构来定义消息内容,可以让人们更易于理解。根据应用程序的需求,消息模式(schema)有许多可用的选项:

  • JSON/XML:易用,而且可读性好。但是缺乏强类型处理能力,不同版本之间的兼容性也不是很好。
  • Avro:它最初是为 Hadoop 开发的一款序列化框架。Avro 提供了一种紧凑的序列化格式,模式和消息体是分开的,当模式发生变化时,不需要重新生成代码;它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。

数据格式的一致性对于 Kafka 来说很重要,它消除了消息读写操作之间的耦合性。如果读写操作紧密地耦合在一起,消息订阅者需要升级应用程序才能同时处理新旧两种数据格式。在消息订阅者升级了之后,消息发布者才能跟着升级,以便使用新的数据格式。

主题和分区

Kafka 的消息通过主题(Topic)进行分类。主题可以被分为若干个分区(Partition),一个分区就是一个提交日志。Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。

消息以追加的方式写入分区,然后以先入先出的顺序读取。但要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。如下图所示的主题有 4 个分区,消息被追加写入每个分区的尾部。

包含多个主题的分区

生产者和消费者

Kafka 的客户端就是 Kafka 系统的用户,它们被分为两种基本类型:生产者消费者

生产者创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

消费者读取消息。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

多个消费者可以组成一个消费者组,消费者组下的消费者协同消费一个主题的所有分区,群组保证主题的每个分区只能被一个消费者使用。通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。

关于“每个分区只能被一个消费者使用”的理解,可以参考下面的例子:

消费者从主题读取消息

  1. 消费者少于或等于分区数:如上图所示的群组中,两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。
  2. 消费者多于分区数:如果一个主题只有 3 个分区,但消费者组内包含 5 个消费者,这时会有 3 个消费者工作,各自消费一个分区,另外 2 个消费者空闲。

broker和集群

一个独立的 Kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控broker。在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制(见图 1-7)。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。第 6 章将详细介绍集群的操作,包括分区复制。

基本架构

Kafka基本架构

  • Cluster:集群由多个服务器组成。每个服务器有单独的名字 broker.id
  • Producer:生产者、负责生产数据。
  • Consumer:消费者、负责消费数据。
  • Topic:一类消息的名称,存储数据时将一类数据存放在某个 Topic 下。

搭建集群

开发环境

  1. 三台虚拟机
1
2
3
192.168.153.100 node01
192.168.153.101 node02
192.168.153.102 node03
  1. 目录结构
1
2
3
/export/software:存放安装包
/export/servers:存放安装程序
/export/data:存放数据和日志
  1. 软件环境
1
2
jdk:java version "1.8.0_141"
zookeeper:zookeeper-3.4.9

安装

  1. 上传压缩包到 /export/software

  2. 解压:

/export/software/
1
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
  1. 修改配置文件,每台机器的 broker.idlistenershost.name 不同:
1
2
3
4
5
6
7
# vim /export/servers/kafka_2.11-1.0.0/config/server.properties
21 broker.id=0
32 listeners=PLAINTEXT://192.168.153.100:9092
61 log.dirs=/export/data/kafka
124 zookeeper.connect=node01:2181,node02:2181,node03:2181
138 host.name=node01
139 delete.topic.enable=true
  1. 分发配置文件到 node02 和 node03,记得在 node02 和 node03 上按照第 3 步修改配置文件:
1
2
scp -r /export/servers/kafka_2.11-1.0.0/ node02:/export/servers/
scp -r /export/servers/kafka_2.11-1.0.0/ node03:/export/servers/

配置详解:

  • broker.id:唯一标识 kafka 集群中的 broker,三台机器分别为 0,1,2
  • listeners:远程客户端连接监听端口,需要修改为本机的 IP
  • log.dirs:存放生产者生产的数据,需创建该目录
  • zookeeper.connect:要连接的 zookeeper 的地址信息
  • host.name:本机的名称
  • delete.topic.enable:可选配置,是否允许删除 topic,重启后生效
  • false 时,执行删除命令仅将 topic 标记为被删除

启动集群

三台虚拟机依次启动 kafka:

/export/servers/kafka_2.11-1.0.0/
1
nohup bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties 2>&1 &

或者编写 shell 脚本一键启停三台虚拟机中的zookeeper服务:

1
2
3
4
5
6
7
#!/bin/bash
echo "启动Kafka集群中..."
for host in node01 node02 node03
do
ssh -q $host "source /etc/profile; nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties 2>&1 &"
done
echo "启动完成"

查看集群

Kafka 集群没有为查看提供 UI 界面,可以在 Windows 运行外部工具 ZooInspector 进行查看,这个工具是一个 java 程序。

ZooInspector查看集群

或者使用 Yahoo! 开源的 Kafka Manager 在 Web 页面管理 Kafka,配置和使用详见 使用Kafka Manager管理Kafka集群

操作集群

需求:订单系统,创建名为 order 的 topic,3 个分片 3 个副本,生产者发送订单消息,消费者消费订单消息。

控制台运行

在控制台使用命令操作 Kafka,常用于测试。

创建 topic

创建一个名为 order 的 topic,需要执行 kafka-topics.sh

/export/servers/kafka/
1
2
bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --zookeeper node01:2181 --topic order   
Created topic "order".

常用参数:

--create:创建一个新的 topic
--delete:删除一个 topic
--describe:查看 topic 的信息
--list:列出所有可用的 topic
--partitions n:对该 topic 分片,分为 n 片
--replication-factor n:为该 topic 的每个分片创建 n 个副本
--topic topicName:创建的 topic 的名称
--zookeeper host:port:必需,创建 topic 到哪个 zookeeper

创建生产者

创建一个生产者,在控制台输入要生产的消息,需要执行 kafka-console-producer.sh

/export/servers/kafka/
1
2
bin/kafka-console-producer.sh --broker-list node01:9092 --topic order
> order 1

常用参数:

--broker-list host:port必需,要连接的 kafka
--topic topicName必需,生产的 topic 的名称

创建消费者

创建一个消费者,将消费的数据显示在控制台,需要执行 kafka-console-consumer.sh

/export/servers/kafka/
1
2
3
4
5
6
# bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic order
order 1

# 或使用下面这个过时的方法
# bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic order
order 1

常用参数:

--bootstrap-server或必需,要连接的 kafka 集群
--zookeeper host:port或必需过时的,要连接的 zookeeper 集群
--topic topicName必需,生产的 topic 的名称
--from-beginning:从头进行消费,能够获取到消费者启动前生产者生产的消息

其他常用命令:

/export/servers/kafka/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 列出所有的 topic
# bin/kafka-topics.sh --list --zookeeper node01:2181
order

# 查看指定 topic 的详细信息
# bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic order
Topic:order PartitionCount:3 ReplicationFactor:3 Configs:
Topic: order Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: order Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: order Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

# 详细信息对应的含义
topic 的名称,分片的数量,副本的数量
分片、Leader、副本、Isr 所在机器的`broker.id`

# 什么是 Isr ?
Kafka不是完全同步,也不是完全异步,是一种ISR机制
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica)
每个Partition都会有一个由leader动态维护的ISR

API 运行

导入依赖包

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OrderProducterTest {
public static void main(String[] args) throws InterruptedException {
//设置配置文件,连接集群
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
//消息确认码,设置消费者收到消息后如何返回确认。0为不确认,1为leader收到消息后确认,-1/all为leader收到消息并且完成备份后确认
props.put("acks", "all");
//重试次数
props.put("retries", 0);
//生产的消息暂存在本地缓冲区,达到数量阈值后统一发送,减少网络占用
props.put("batch.size", 16384);
//达到时间阈值后统一发送
props.put("linger.ms", 1);
//本地缓冲区的大小
props.put("buffer.memory", 33554432);
//数据发送到 kafka 集群的时候需要先序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//通过配置文件方式连接集群
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
//生产数据,不指定分区编号,以轮询方式发送数据到各个分片
kafkaProducer.send(new ProducerRecord<>("order", null, "订单编号:" + i));
Thread.sleep(100);
}
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class OrderConsumerTest {
public static void main(String[] args) {
//设置配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
//消费者组的名称
props.put("group.id", "test");
//是否自动向 kafka 集群提交偏移量
props.put("enable.auto.commit", "true");
//多久提交一次
props.put("auto.commit.interval.ms", "1000");
//反序列化接收到的数据
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//通过配置文件方式连接集群
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("order"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100); //轮询数据间隔
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("消费的数据为:" + record.value());
}
}
}
}

Spring 整合 Kafka

导入依赖包

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>

配置生产者

resources/applicationContext-kafka-productor.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!--加载 Kafka 属性配置文件-->
<context:property-placeholder location="classpath:*.properties"/>

<!--定义producer的参数-->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.servers}"/>
<entry key="acks" value="all"/>
<entry key="retries" value="0"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>

<!--创建 producerfactory-->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>

<!--创建 orderKafkaProducer,使用的时候只需要注入这个 bean 即可使用 send 方法生产消息-->
<bean id="orderKafkaProducer" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="${kafka.topic}"/>
</bean>
</beans>

配置消费者

resources/applicationContext-kafka-consumer.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!--加载 Kafka 属性配置文件-->
<context:property-placeholder location="classpath:*.properties"/>

<!--定义 consumer 的参数-->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.servers}"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>

<!--创建 consumerFactory-->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>

<!--设置消费者容器的配置-->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="${kafka.topic}"/>
<!--实际消费消息的消息监听者-->
<property name="messageListener" ref="orderKafkaConsumer"/>
</bean>

<!--创建消费者容器-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>

外部配置文件

resources/init.properties
1
2
kafka.servers=node01:9092
kafka.topic=keywords

注入生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
//未验证
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping(value = "/orderProduce.do")
public void orderProduce() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
kafkaTemplate.sendDefault("订单编号:" + i);
Thread.sleep(100);
}
}
}

注入消费者

1
2
3
4
5
6
7
8
9
public class OrderConsumer implements MessageListener<String, String> {
@Autowired
private OrderService orderService;

@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
}
  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Kafka/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!