![Kafka头图]()
Apache Kafka 是一个开源的分布式事件流平台,它是一款基于发布与订阅的消息队列系统,采用生产者、消费者模式,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的消息平台。
概述
Kafka 是一款基于发布与订阅的消息队列系统,采用生产者、消费者模式,该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的消息平台。
为什么需要消息队列
消息队列系统的核心作用就是三点:缓冲、异构、解耦。以用户注册案例来说明消息队列的作用。用户注册的一般流程:
- 用户输入注册信息,等待结果。
- 系统进行合法性验证。
- 初始化账号信息。
- 其他账户配套系统,如:账号绑定、多账号数据合并、通讯录用户关联、新用户礼包发放。
- 通知用户注册结果。
这样就面临一个问题:用户注册流程包含多个步骤,注册高峰期每一步都可能需要等待一段时间,用户等待时间是所有步骤的累计时间(单线程)/ 耗时最长的那个步骤时间(多线程)。
我们需要减少用户的等待时间,解决方法就是使用消息队列:
- 系统验证用户注册信息合法后,提示注册成功。生产者生产注册消息到消息平台。
- 与注册流程相关的程序作为消费者来消费注册消息,完成对应的操作。
发布与订阅
数据(消息)的发送者(发布者)不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者(订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个 broker,也就是发布消息的中心点。
消息和批次
Kafka 的数据单元被称为消息。可以把消息看成是数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以对于 Kafka 来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。键也是一个字节数组,与消息一样,对于 Kafka 来说也没有特殊的含义。当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。第 3 章将详细介绍键的用法。
为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
模式
使用一些额外的结构来定义消息内容,可以让人们更易于理解。根据应用程序的需求,消息模式(schema)有许多可用的选项:
- JSON/XML:易用,而且可读性好。但是缺乏强类型处理能力,不同版本之间的兼容性也不是很好。
- Avro:它最初是为 Hadoop 开发的一款序列化框架。Avro 提供了一种紧凑的序列化格式,模式和消息体是分开的,当模式发生变化时,不需要重新生成代码;它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。
数据格式的一致性对于 Kafka 来说很重要,它消除了消息读写操作之间的耦合性。如果读写操作紧密地耦合在一起,消息订阅者需要升级应用程序才能同时处理新旧两种数据格式。在消息订阅者升级了之后,消息发布者才能跟着升级,以便使用新的数据格式。
主题和分区
Kafka 的消息通过主题(Topic)进行分类。主题可以被分为若干个分区(Partition),一个分区就是一个提交日志。Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。
消息以追加的方式写入分区,然后以先入先出的顺序读取。但要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。如下图所示的主题有 4 个分区,消息被追加写入每个分区的尾部。
![包含多个主题的分区]()
生产者和消费者
Kafka 的客户端就是 Kafka 系统的用户,它们被分为两种基本类型:生产者和消费者。
生产者创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
消费者读取消息。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
多个消费者可以组成一个消费者组,消费者组下的消费者协同消费一个主题的所有分区,群组保证主题的每个分区只能被一个消费者使用。通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。
关于“每个分区只能被一个消费者使用”的理解,可以参考下面的例子:
![消费者从主题读取消息]()
- 消费者少于或等于分区数:如上图所示的群组中,两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。
- 消费者多于分区数:如果一个主题只有 3 个分区,但消费者组内包含 5 个消费者,这时会有 3 个消费者工作,各自消费一个分区,另外 2 个消费者空闲。
broker和集群
一个独立的 Kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。
broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控broker。在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制(见图 1-7)。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。第 6 章将详细介绍集群的操作,包括分区复制。
基本架构
![Kafka基本架构]()
- Cluster:集群由多个服务器组成。每个服务器有单独的名字
broker.id
。
- Producer:生产者、负责生产数据。
- Consumer:消费者、负责消费数据。
- Topic:一类消息的名称,存储数据时将一类数据存放在某个 Topic 下。
搭建集群
开发环境
- 三台虚拟机
1 2 3
| 192.168.153.100 node01 192.168.153.101 node02 192.168.153.102 node03
|
- 目录结构
1 2 3
| /export/software:存放安装包 /export/servers:存放安装程序 /export/data:存放数据和日志
|
- 软件环境
1 2
| jdk:java version "1.8.0_141" zookeeper:zookeeper-3.4.9
|
安装
上传压缩包到 /export/software
。
解压:
/export/software/1
| tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
|
- 修改配置文件,每台机器的
broker.id
,listeners
和 host.name
不同:
1 2 3 4 5 6 7
| 21 broker.id=0 32 listeners=PLAINTEXT://192.168.153.100:9092 61 log.dirs=/export/data/kafka 124 zookeeper.connect=node01:2181,node02:2181,node03:2181 138 host.name=node01 139 delete.topic.enable=true
|
- 分发配置文件到 node02 和 node03,记得在 node02 和 node03 上按照第 3 步修改配置文件:
1 2
| scp -r /export/servers/kafka_2.11-1.0.0/ node02:/export/servers/ scp -r /export/servers/kafka_2.11-1.0.0/ node03:/export/servers/
|
配置详解:
broker.id
:唯一标识 kafka 集群中的 broker,三台机器分别为 0,1,2
listeners
:远程客户端连接监听端口,需要修改为本机的 IP
log.dirs
:存放生产者生产的数据,需创建该目录
zookeeper.connect
:要连接的 zookeeper 的地址信息
host.name
:本机的名称
delete.topic.enable
:可选配置,是否允许删除 topic,重启后生效
- 为
false
时,执行删除命令仅将 topic 标记为被删除
启动集群
三台虚拟机依次启动 kafka:
/export/servers/kafka_2.11-1.0.0/1
| nohup bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties 2>&1 &
|
或者编写 shell 脚本一键启停三台虚拟机中的zookeeper服务:
1 2 3 4 5 6 7
| #!/bin/bash echo "启动Kafka集群中..." for host in node01 node02 node03 do ssh -q $host "source /etc/profile; nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties 2>&1 &" done echo "启动完成"
|
查看集群
Kafka 集群没有为查看提供 UI 界面,可以在 Windows 运行外部工具 ZooInspector 进行查看,这个工具是一个 java 程序。
![ZooInspector查看集群]()
或者使用 Yahoo! 开源的 Kafka Manager 在 Web 页面管理 Kafka,配置和使用详见 使用Kafka Manager管理Kafka集群 。
操作集群
需求:订单系统,创建名为 order 的 topic,3 个分片 3 个副本,生产者发送订单消息,消费者消费订单消息。
控制台运行
在控制台使用命令操作 Kafka,常用于测试。
创建 topic
创建一个名为 order
的 topic,需要执行 kafka-topics.sh
:
/export/servers/kafka/1 2 3 4 5 6 7 8
| $ bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --zookeeper node01:2181 --topic order Created topic "order".
$ kafka-topics --create --partitions 3 --replication-factor 3 --zookeeper cdh1:2181 --topic test01 Created topic "test01".
|
常用参数:
--create
:创建一个新的 topic
--delete
:删除一个 topic
--describe
:查看 topic 的信息
--list
:列出所有可用的 topic
--partitions n
:对该 topic 分片,分为 n 片
--replication-factor n
:为该 topic 的每个分片创建 n 个副本
--topic topicName
:创建的 topic 的名称
--zookeeper host:port
:必需,创建 topic 到哪个 zookeeper
创建生产者
创建一个生产者,在控制台输入要生产的消息,需要执行 kafka-console-producer.sh
:
/export/servers/kafka/1 2 3 4 5 6 7 8
| $ bin/kafka-console-producer.sh --broker-list node01:9092 --topic order > order 1
$ kafka-console-producer --broker-list cdh1:9092 --topic test01 >helloworld 22/07/12 08:35:38 INFO clients.Metadata: Cluster ID: 7D3HnI2STrOEgH4Y-c6eJQ >goodbye
|
常用参数:
--broker-list host:port
:必需,要连接的 kafka
--topic topicName
:必需,生产的 topic 的名称
创建消费者
创建一个消费者,将消费的数据显示在控制台,需要执行 kafka-console-consumer.sh
:
/export/servers/kafka/1 2 3 4 5 6 7 8 9 10 11
| $ bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic order order 1
$ bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic order order 1
$ kafka-console-consumer --bootstrap-server cdh1:9092 --topic test01 helloworld goodbye
|
常用参数:
--bootstrap-server
:或必需,要连接的 kafka 集群
--zookeeper host:port
:或必需,过时的,要连接的 zookeeper 集群
--topic topicName
:必需,生产的 topic 的名称
--from-beginning
:从头进行消费,能够获取到消费者启动前生产者生产的消息
其他常用命令:
/export/servers/kafka/1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| $ bin/kafka-topics.sh --list --zookeeper node01:2181 order
$ bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic order Topic:order PartitionCount:3 ReplicationFactor:3 Configs: Topic: order Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: order Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: order Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
topic 的名称,分片的数量,副本的数量 分片、Leader、副本、Isr 所在机器的`broker.id`
Kafka不是完全同步,也不是完全异步,是一种ISR机制 leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica) 每个Partition都会有一个由leader动态维护的ISR
|
API 运行
导入依赖包
1 2 3 4 5
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
|
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class OrderProducterTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { kafkaProducer.send(new ProducerRecord<>("order", null, "订单编号:" + i)); Thread.sleep(100); } } }
|
创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class OrderConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList("order")); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value()); } } } }
|
Spring 整合 Kafka
导入依赖包
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
|
配置生产者
resources/applicationContext-kafka-productor.xml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:*.properties"/>
<bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.servers}"/> <entry key="acks" value="all"/> <entry key="retries" value="0"/> <entry key="batch.size" value="16384"/> <entry key="linger.ms" value="1"/> <entry key="buffer.memory" value="33554432"/> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> </map> </constructor-arg> </bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties"/> </constructor-arg> </bean>
<bean id="orderKafkaProducer" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory"/> <constructor-arg name="autoFlush" value="true"/> <property name="defaultTopic" value="${kafka.topic}"/> </bean> </beans>
|
配置消费者
resources/applicationContext-kafka-consumer.xml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:*.properties"/>
<bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.servers}"/> <entry key="group.id" value="0"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="15000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean>
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="${kafka.topic}"/> <property name="messageListener" ref="orderKafkaConsumer"/> </bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans>
|
外部配置文件
resources/init.properties1 2
| kafka.servers=node01:9092 kafka.topic=keywords
|
注入生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class OrderProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping(value = "/orderProduce.do") public void orderProduce() throws InterruptedException { for (int i = 0; i < 1000; i++) { kafkaTemplate.sendDefault("订单编号:" + i); Thread.sleep(100); } } }
|
注入消费者
1 2 3 4 5 6 7 8 9
| public class OrderConsumer implements MessageListener<String, String> { @Autowired private OrderService orderService;
@Override public void onMessage(ConsumerRecord<String, String> record) { System.out.println(record.value()); } }
|