0%

MapReduce快速上手

MapReduce 是一个分布式计算框架,用于大规模数据集的并行运算。”Map(映射)”和”Reduce(归约)”是它的主要思想,简单来说就是“分而治之”。


概述

MapReduce 是一个分布式运算程序的编程框架,极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

MapReduce设计构思

MapReduce构思体现在如下的三个方面:

  1. 如何处理大数据:分而治之。对相互间不具有计算依赖关系的大数据,划分计算任务或者计算数据以进行并行计算。不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算。
  2. 构建抽象模型 Map 和 Reduce:
  • Map:对一组数据元素进行某种重复式的处理。
  • Reduce:对Map的中间结果进行某种进一步的结果整理。
  1. 统一构架,隐藏系统层细节。有了 MapReduce 统一封装底层细节,那么程序员就不再需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码

举例说明,现需要统计图书室中书的数量。多个人统计不同书架上书的数量,这就是“Map”;将所有人的统计结果相加得到想要的结果,这就是“Reduce”。

MapReduce编程规范

MapReduce 程序的开发分为三个阶段八个步骤:

Map 阶段

  1. 读取文件,设置 inputFormat,将数据切分成键值对(k1,v1)
  2. 自定义 map 逻辑,接收 (k1,v1),转换成 (k2,v2)

Shuffle 阶段

  1. 分区,分区的数量与 reduce 个数对应;收到的 (k2,v2) 直接匹配到对应的分区
  2. 排序,对 (k2,v2) 进行排列
  3. 规约,在 map 端对数据做一次聚合,减少输出的 (k2,v2) 的数据量
  4. 分组,将 key 相同的 value 放到一个集合当中,下游 reduce 会主动来抓取,将对应自己分组的数据拿走

Reduce 阶段

  1. 自定义 reduce 逻辑,接收 (k2,v2),转换成 (k3,v3)
  2. 指定输出,设置 outputFormat,将 reduce 处理后的数据进行输出

MapReduce程序运行模式

本地运行模式

  • MapReduce 程序被提交给LocalJobRunner在本地以单进程的形式运行
  • 处理的数据和输出结果可以在本地文件系统,也可以在 HDFS 上
  • 本地模式非常便于进行业务逻辑的 debug,可以在 IDEA 中打断点

实现本地运行,不要带集群的配置文件即可。本质是程序的 conf 中是否有以下参数:

1
2
mapreduce.framework.name=local
yarn.resourcemanager.hostname=local

集群运行模式

  • 将 MapReduce 程序提交给 YRAN 集群,分发到多个节点并行执行
  • 处理的数据和输出结果应在 HDFS 上

实现集群运行,将程序打成 jar 包,然后在集群的任意一个节点上用以下命令启动:

1
yarn jar wordcount-1.0-SNAPSHOT.jar moe.sannaha.wordcount.WordCountJobMain

序列化与反序列化

序列化(Serialization):把结构化对象转化为字节流。当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流。
反序列化(Deserialization):是序列化的逆过程,把字节流转为结构化对象。当要将接收到或从磁盘读取的字节流转换为对象的时候,就要进行反序列化。

在 MapReduce 程序中,我们使用 Hadoop 提供的序列化机制(Writable)取代 Java 提供的(Serializable)。

为什么不使用 Java 序列化?

Java 序列化是一个重量级序列化框架,没有做一定的压缩,会附带很多额外的信息(各种校验信息,header,继承体系等),占用存储空间大,不便于在网络中高效传输;反序列化每次都要重新创建对象,内存消耗大。 Hadoop 序列化(Writable)不用像 Java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销;反序列化可以重用对象,在已有对象上进行反序列化操作。

MapReduce运行机制

MapReduce 作业有以下 5 个独立的实体:

  • Client:提交 MapReduce 作业。
  • ResourceManager:协调集群上资源的分配。
  • NodeManager:启动和监视集群中机器上的计算容器(container)。
  • MapReduce 的 application master:协调运行 MapReduce 作业的任务。它和 MapReduce 任务在容器中运行,这些容器由 ResourceManager 分配并由 NodeManager 进行管理。
  • 分布式文件系统:用来与其他实体间共享作业文件。

MapReduce工作流程

一个完整的 MapReduce 程序在分布式运行时有 3 类实例进程:

  • MRAppMaster:负责整个程序的过程调度及状态协调。
  • MapTask:负责 Map 阶段的整个数据处理流程。
  • ReduceTask:负责 Reduce 阶段的整个数据处理流程。

MapReduce过程

MapReduce 工作流程概述如下:

  1. MapReduce 作业运行时最先启动 MRAppMaster,MRAppMaster 根据 job 的描述信息,计算出需要的 MapTask 实例数量,然后向集群申请机器启动相应数量的 MapTask 进程。
  2. MapTask 进程启动之后,对给定的数据切片进行处理,主要流程为:
  • 利用客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 <key,value> 对。
  • 调用用户定义的 map() 方法处理输入的 <key,value> 对,并将 map() 方法输出的 <key,value> 对收集到缓存。
  • 将缓存中的 <key,value> 对按照 key 进行分区排序后溢写到磁盘文件。
  1. MRAppMaster 监控到所有 MapTask 进程任务完成之后,会根据用户指定的参数启动相应数量的 ReduceTask 进程,并告知 ReduceTask 进程要处理的数据范围(数据分区)。
  2. ReduceTask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从运行着 MapTask 的若干个节点上获取到若干个 MapTask 输出结果文件,并在本地进行重新归并排序,然后按照相同 key 的KV为一个组,调用用户定义的 reduce() 方法进行逻辑运算,并收集运算输出的结果 <key,value>,然后调用客户指定的 OutputFormat 将结果数据输出到外部存储。

MapReduce工作流程.png

MapReduce 工作流程如上图所示,接下来分别对 MapTask 和 ReduceTask 展开详细说明。

MapTask工作流程

MapTask工作流程

MapTask 流程:

  1. Read 阶段。数据读取组件 InputFormat(默认 TextInputFormat)通过 getSplits() 方法对输入目录中的文件进行逻辑切片规划得到多个 split 文件,启动与 split 文件个数相同个 MapTask 进行处理。split 文件和 block 默认也是一对一。RecordReader 对象(默认LineRecordReader)读取 split 文件,以 \n 作为分隔符读取一行数据,返回 <key,value>key 表示每行首字符偏移值,value 表示这一行文本内容。
  2. Map 阶段。进入用户编写的 Mapper 类中,执行重写的 map() 方法处理数据。RecordReader 每读取一行,这里调用一次 map() 方法。
  3. Collect 阶段。数据被 mapper 处理结束之后交给 OutputCollector 收集器,会对其进行分区处理,默认使用 HashPartitioner。map 输出的结果会被写入内存(环形缓冲区,默认 100MB),每个 MapTask 都有缓冲区,它可以减少磁盘IO的影响。
  4. Spill 阶段。当缓冲区的数据达到阈值(100MB x 0.8),溢写线程启动,锁定这 80MB 内存,对这部分数据的 key 做排序(如果设置过 Combiner,会将有相同分区号和 key 的数据进行排序)后溢写(spill)到磁盘。因为缓冲区还保留了内存空间(100MB x 0.2),溢写过程不影响继续将 map 结果写入内存。
  5. Merge 阶段。所有数据处理结束后会对磁盘中溢写产生的临时文件做合并(merge),生成最终的文件,并为这个文件提供了一个索引文件,以记录每个 reduce 对应数据的偏移量。至此 Map 阶段结束,等待 ReduceTask 来拉取数据。

ReduceTask工作流程

ReduceTask工作流程

ReduceTask 工作流程大致分为以下几个阶段:

  1. Copy 阶段。Reduce 进程启动一些 Fetcher 线程,以 HTTP 方式从已经完成 MapTask 的 Map 端 Copy 文件,Copy 来的数据会先放入内存缓冲区,然后进行 merge 合并文件。
  2. Merge 阶段。Merge 有三种形式:内存到内存(默认不启用)、内存到磁盘、磁盘到磁盘。在此过程中会启动两个 merge 线程,分别为 inMemoryMerger(内存到磁盘) 和 onDiskMerger(磁盘到磁盘)。当内存中的数据量到达一定阈值,就执行 inMemoryMerger 将内存缓冲区数据溢写到磁盘(如果设置过Combiner,也是会启用的),产生多个临时文件。inMemoryMerger 线程一直运行到没有 Map 端数据,然后执行 onDiskMerger 将磁盘中的临时文件合并生成最终的大文件。
  3. Sort 阶段。把分散的文件合并成一个大的文件时,还会对合并的数据进行归并排序。
  4. Reduce 阶段。进入用户编写的 Reducer 类中,执行重写的 reduce() 方法,每次调用会产生零个或多个 <key, value>,最后把这些输出的键值对写入到 HDFS 文件中。

Shuffle过程优化

Map 阶段处理的数据如何传递给 Reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 Shuffle,范围从 map() 方法之后到 reduce() 方法之前,包括了 MapTask 的 Collect 阶段、Spill 阶段、Merge 阶段和 ReduceTask 流程的 Copy 阶段、Merge 阶段、Sort 阶段。

调整溢写过程

Shuffle 的缓冲区大小会影响到 MapReduce 程序的执行效率。缓冲区越大,磁盘 IO 的次数越少,执行速度就越快。缓冲区可以通过以下参数调整:

1
2
3
4
5
6
7
8
# 调整缓冲区大小,默认为100
mapreduce.task.io.sort.mb

# 调整缓冲区开始spill的比例,默认是0.8
mapreduce.task.io.sort.spill.percent

# 每次spill最多合并多少个文件,默认为10
mapreduce.task.io.sort.factor

使用规约

对溢写文件,可以使用 Combiner 对 Map 端的输出先做一次合并,以减少在 Map 和 Reduce 之间的数据传输量。使用 Combiner 的前提是不影响业务逻辑,比如业务逻辑为求和,则可以进行 Combiner,求平均值则不行。

数据压缩

在 Shuffle 阶段,从 Map 端输出的数据都要通过网络发送到 Reduce 端,这一过程中涉及到大量的网络 IO,如果数据能够进行压缩,那么数据的发送量就会少得多。

Hadoop 对于压缩格式的是透明识别,能够自动为我们将压缩的文件解压。目前在 Hadoop 中常用的几种压缩格式:lzo,gzip,snappy,bzip2,简单做一下对比。

Hadoop 常用的压缩算法

压缩格式 算法 扩展名 多文件 splitable native hadoop自带
gzip deflate .gz
bzip2 bzip2 .bz2
lzo lzo .lzo
snappy snappy .snappy

对应的Java类

压缩格式 对应的Java类
gzip org.apache.hadoop.io.compress.GZipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
lzo com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩速率对比

压缩格式 压缩比 压缩速率 解压速率
gzip 13.4% 21 MB/s 118 MB/s
bzip2 13.2% 2.4MB/s 9.5MB/s
lzo 20.5% 135 MB/s 410 MB/s
snappy 22.2% 172 MB/s 409 MB/s

在代码中设置压缩
通过修改代码的方式来实现数据的压缩,使用灵活。

设置 Map 阶段的压缩:

1
2
3
Configuration configuration = new Configuration();
configuration.set("mapreduce.map.output.compress","true");
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

设置 Reduce 阶段的压缩:

1
2
3
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

在配置中设置压缩
修改mapred-site.xml配置文件,然后重启集群,对所有的 MapReduce 任务启用压缩。

配置 Map 阶段的压缩:

mapred-site.xml
1
2
3
4
5
6
7
8
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

配置 Reduce 阶段的压缩:

mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

编程实例

WordCount

需求

在给定的文本文件中统计输出每个单词出现的总次数。

测试数据

存储于HDFS的 wordcount.txt

wordcount.txt
1
2
3
Hello,MapReduce,Spark
Hi,MapReduce,Flink
Hello,World

项目依赖

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>moe.sannaha</groupId>
<artifactId>MapReduceDemo</artifactId>
<version>1.0-SNAPSHOT</version>

<name>MapReduceDemo</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<!-- descriptorRef=jar-with-dependencies 时,可以配置 mainClass 实现在服务器运行 jar 包时无需再指定主类-->
<descriptorRef>jar-with-dependencies</descriptorRef>
<!-- descriptorRef=project时 mainClass 配置无效,运行时仍需指定主类 -->
<!-- <descriptorRef>project</descriptorRef>-->
</descriptorRefs>
<archive>
<manifest>
<mainClass>moe.sannaha.mapreduce.wordcount.WordCountJobMain</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Mapper

定义一个 Mapper 类 WrodCountMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package moe.sannaha.mapreduce.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Map 阶段
* 自定义 Map 逻辑
* 使用 Hadoop 提供的序列化,不使用 Java 的序列化框架
*
* $ cat wordcount.txt
* Hello,MapReduce,Spark
* Hi,MapReduce,Flink
* Hello,World
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text wordText = new Text();
// 用来给每个单词都标记上出现1次
private LongWritable count = new LongWritable(1);

/**
* 每接收一行数据就执行一次下面的 map() 方法
* 接收数据:
* k1:行偏移量
* v1:Hello,MapReduce,Spark
* 处理为:
* k2:Hello
* v2:1
* ...
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将每行数据按逗号拆分为单词,给每个单词标记次数1后发送到下游
for (String word : value.toString().split(",")) {
wordText.set(word);
context.write(wordText, count);
}
}
}

Reducer

定义一个 Reducer 类 WordCountReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package moe.sannaha.mapreduce.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
/**
* Reduce 阶段
* 自定义 Reduce 逻辑
* 使用 Hadoop 提供的序列化,不使用 Java 的序列化框架
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

private LongWritable wordCount = new LongWritable();
/**
* 每接收一行数据就执行一次下面的 reduce() 方法
* 接收数据:
* k2:Hello
* v2:{1,1}
* 处理为:
* k3:Hello
* v3:1+1=2
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
wordCount.set(sum);
context.write(key, wordCount);
}
}

Main

定义一个主类 WordCountJobMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package moe.sannaha.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* MapReduce 程序的主类
* 作用:
* 1. 整合 MapReduce 程序的各个阶段
* 2. 作为 MapReduce 程序的执行入口
*/
public class WordCountJobMain extends Configured implements Tool {
/**
* 整合 MapReduce 程序的各个阶段
*/
@Override
public int run(String[] strings) throws Exception {
// 获取job对象
Job job = Job.getInstance(super.getConf(), "WordCount");

// 关联本程序的主类
job.setJarByClass(WordCountJobMain.class);

// 关联Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 设置Map端输出的K,V类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

// 设置Reduce端输出的K,V类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 设置程序输入和输出数据的格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// 设置程序输入和输出数据所在的目录
TextInputFormat.addInputPath(job, new Path("hdfs://cdh1:8020/data/wordcount/"));
TextOutputFormat.setOutputPath(job, new Path("hdfs://cdh1:8020/data/output"));

// 设置Reduce任务数量为1,这样最终会输出一个文件
job.setNumReduceTasks(1);

// 等待执行完成,返回执行结果
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}

/**
* 作为 MapReduce 程序的执行入口
*/
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new WordCountJobMain(), args);
System.exit(run);
}
}

分区

在 MapReduce 中,会将同一个分区的数据发送到同一个 reduce 当中进行处理,因此分区数与 reduce 个数(输出文件个数)一致。

需求

根据大小分区统计彩票开奖结果,根据GAME_RESULT字段的值进行判断,15 以下为“小”,15 及以上为“大”。

测试数据

ID USER_ID GAME_TYPE OPEN_TIME GAME_NUM GAME_RESULT GAME_RESULT_DESC RESULT_TYPE

partition.csv
1
2
3
4
5
6
7
8
9
10
1	0	1	2017-07-31 23:10:12	837255	6	4+1+1=6	小,双
2 0 1 2017-07-31 23:15:03 837256 14 4+7+3=14 大,双
3 0 1 2017-07-31 23:20:12 837257 17 6+9+2=17 大,单
4 0 1 2017-07-31 23:25:12 837258 22 5+8+9=22 大,双
5 0 1 2017-07-31 23:30:18 837259 1 1+0+0=1 小,单
6 0 2 2017-07-31 23:17:22 2170779 4 2+0+2=4 小,双
7 0 2 2017-07-31 23:20:49 2170780 12 1+2+9=12 小,双
8 0 2 2017-07-31 23:24:18 2170781 11 6+4+1=11 小,单
9 0 2 2017-07-31 23:27:43 2170782 20 5+7+8=20 大,双
10 0 2 2017-07-31 23:31:23 2170783 12 3+1+8=12 小,双

Mapper

定义一个 Mapper 类 CustomPartitionMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 一、Map 阶段
* 自定义 map 逻辑
*/
public class CustomPartitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

/**
* 每接收一行数据就执行一次下面的 map 方法
* 接收数据:第六个字段为彩票开奖结果,15 以下为小,15 及以上为大
* k1:行偏移量
* v1:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双
* 处理为:
* k2:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双
* v2:null
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//接收数据
//处理数据
//发送数据到下游
context.write(value,NullWritable.get());
}
}

Partitioner

定义一个 Partitioner 类 CustomPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 二、Shuffle 阶段
* 自定义分区
*/
public class CustomPartitioner extends Partitioner<Text, NullWritable> {

/**
* 接收数据:
* k2:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双
* v2:null
*/

@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//接收数据
String textStr = text.toString();
String[] fieldArr = textStr.split("\t");

String gameResult = fieldArr[5];

/**
* 处理数据,结果为“小”的结果打上逻辑标识“0”,结果为“大”的结果打上逻辑标识“1”
* 返回值是一个分区的标识,标识相同的数据去指定的分区
*/
if (Integer.parseInt(gameResult)< 15) {
return 0;
} else {
return 1;
}
}
}

Reducer

定义一个 Reducer 类 CustomPartitionReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 三、Reduce 阶段
* 自定义 reduce 逻辑
*/
public class CustomPartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

/**
* 接收数据:
* k2:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双
* v2:Null
* 处理数据为:
* k3:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双
* v3:Null
*/
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

Main

定义一个主类 CustomPartitionJobMain。注意添加 job.setNumReduceTasks(2)设置 ReduceTask 的数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 目的:根据大小分区统计彩票开奖结果
* 作用:
* 1. 整合 MapReduce 程序 3 阶段 8 步骤
* 2. 作为 MapReduce 程序的执行入口
*/
public class CustomPartitionJobMain extends Configured implements Tool {

/**
* 整合 MapReduce 程序 3 阶段 8 步骤
*/
@Override
public int run(String[] args) throws Exception {
//整合的媒介
Job job = Job.getInstance(super.getConf(), "CustomPartition");
job.setJarByClass(CustomPartitionJobMain.class);
/*
一、Map 阶段
*/
//1. 读取文件,指定输入数据的格式
job.setInputFormatClass(TextInputFormat.class);
//指定文件路径,具体到所在目录
TextInputFormat.addInputPath(job, new Path(args[0]));

//2. 自定义 map 逻辑
job.setMapperClass(CustomPartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

/*
二、Shuffle 阶段
*/
//3. 分区
job.setPartitionerClass(CustomPartitioner.class);
//4. 排序
//5. 规约
//6. 分组

/*
三、Reduce 阶段
*/
//7. 自定义reduce逻辑
job.setReducerClass(CustomPartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

//设置 ReduceTask 的数量,与分区数/输出文件的个数对应,设置后就只能在集群中运行
job.setNumReduceTasks(2);

//8. 指定输出
//指定输出数据的文件格式
job.setOutputFormatClass(TextOutputFormat.class);

//指定输出的路径,指定目录必须不存在,否则会报错
TextOutputFormat.setOutputPath(job, new Path(args[1]));

//等待执行完成
boolean b = job.waitForCompletion(true);

return b ? 0 : 1;
}

/**
* 作为 MapReduce 程序的执行入口
*/
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new CustomPartitionJobMain(), args);
System.exit(run);
}
}

打包运行

该 MapReduce 程序涉及到多个分区,需要在集群环境运行。

配置pom.xml

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>moe.sannaha.custompartition.CustomPartitionJobMain</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

使用maven工具打包得到custompartition-1.0-SNAPSHOT-jar-with-dependencies.jar,上传到 HDFS 后运行该 jar 包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 上传到HDFS
# hdfs dfs -put partition.csv /test/data/

# 运行jar包
# hadoop jar custompartition-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://node01:8020/test/data/partition.csv hdfs://node01:8020/test/data/output
20/02/03 11:52:11 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.153.100:8032
20/02/03 11:52:15 INFO input.FileInputFormat: Total input paths to process : 1
20/02/03 11:52:15 INFO mapreduce.JobSubmitter: number of splits:1
20/02/03 11:52:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1580365594154_0002
20/02/03 11:52:15 INFO impl.YarnClientImpl: Submitted application application_1580365594154_0002
20/02/03 11:52:15 INFO mapreduce.Job: The url to track the job: http://node01:8088/proxy/application_1580365594154_0002/
20/02/03 11:52:15 INFO mapreduce.Job: Running job: job_1580365594154_0002
20/02/03 11:52:36 INFO mapreduce.Job: Job job_1580365594154_0002 running in uber mode : false
20/02/03 11:52:36 INFO mapreduce.Job: map 0% reduce 0%
20/02/03 11:52:56 INFO mapreduce.Job: map 100% reduce 0%
20/02/03 11:53:09 INFO mapreduce.Job: map 100% reduce 50%
20/02/03 11:53:19 INFO mapreduce.Job: map 100% reduce 100%
20/02/03 11:53:20 INFO mapreduce.Job: Job job_1580365594154_0002 completed successfully
20/02/03 11:53:20 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=1240988
FILE: Number of bytes written=2912243
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1210657
HDFS: Number of bytes written=1210550
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=2
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=17899
Total time spent by all reduces in occupied slots (ms)=27011
Total time spent by all map tasks (ms)=17899
Total time spent by all reduce tasks (ms)=27011
Total vcore-milliseconds taken by all map tasks=17899
Total vcore-milliseconds taken by all reduce tasks=27011
Total megabyte-milliseconds taken by all map tasks=18328576
Total megabyte-milliseconds taken by all reduce tasks=27659264
Map-Reduce Framework
Map input records=15213
Map output records=15213
Map output bytes=1210550
Map output materialized bytes=1240988
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=15213
Reduce shuffle bytes=1240988
Reduce input records=15213
Reduce output records=15213
Spilled Records=30426
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=9580
CPU time spent (ms)=21870
Physical memory (bytes) snapshot=767721472
Virtual memory (bytes) snapshot=8261402624
Total committed heap usage (bytes)=588316672
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1210550
File Output Format Counters
Bytes Written=1210550

查看输出目录,有part-r-00000part-r-00001两个结果文件:

1
2
3
4
5
# hdfs dfs -ls /test/data/output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2020-02-03 11:53 /test/data/output/_SUCCESS
-rw-r--r-- 3 root supergroup 700807 2020-02-03 11:53 /test/data/output/part-r-00000
-rw-r--r-- 3 root supergroup 509743 2020-02-03 11:53 /test/data/output/part-r-00001

CustomPartitioner代码中的配置一致,结果为“小”的结果打上逻辑标识“0”,保存在part-r-00000文件;结果为“大”的结果打上逻辑标识“1”,保存在part-r-00001文件。

排序

Hadoop 的序列化机制是 Writable,Hadoop 定义了一个 Writable 接口,一个类只需实现这个接口即可支持可序列化。Writable 有一个子接口 WritableComparable,它既可实现序列化,也可以通过compareTo()方法对 key 进行比较。

1
2
3
4
5
6
7
8
9
o1.compareTo(o2); 
//调用compareTo()方法:
//如果当前对象o1与比较对象o2相等,该方法返回0
//如果当前对象o1大于比较对象o2相等,该方法返回正值
//如果当前对象o1小于比较对象o2相等,该方法返回负值

//重写compareTo()方法:
//返回正值,当前对象o1排在比较对象o2后面
//返回负值,当前对象o1排在比较对象o2前面

需求

有两列数据,对第一列按照字典顺序进行排序,第一列相同时,对第二列按照升序进行排序。

测试数据

1
2
3
4
5
6
7
8
a	1
a 9
b 3
a 7
b 8
b 10
a 5
a 9

解决思路

mapper 输出 <key, value>,将 key 和 value 组合成一个新 key(称为 newKey),value 保持不变,得到<(key,value), value>,对其根据 newKey 排序,如果 newKey 相同,再对 value 进行排序。

自定义数据类型及比较器

定义一个 WritableComparable 类 SortBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 数据封装,实现 WritableComparable 接口
*/
public class SortBean implements WritableComparable<SortBean> {

private String firstField;
private Integer secondField;

public SortBean() {
}

public SortBean(String firstField, int secondField) {
this.firstField = firstField;
this.secondField = secondField;
}

@Override
public String toString() {
return firstField + "\t" + secondField;
}

public String getFirstField() {
return firstField;
}

public void setFirstField(String firstField) {
this.firstField = firstField;
}

public int getSecondField() {
return secondField;
}

public void setSecondField(int secondField) {
this.secondField = secondField;
}

/**
* 重写比较器
* 需求:先比较第一个字段,如果相等,再比较第二个字段
*/
@Override
public int compareTo(SortBean o) {
/*
第一个等于第二个,返回 0
第一个大于第二个,返回正值
第一个小于第二个,返回负值
*/
int compare = this.firstField.compareTo(o.getFirstField());

if (compare == 0) {
compare = this.secondField.compareTo(o.getSecondField());
}

return compare;
}

/**
* 序列化方法
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(firstField);
dataOutput.writeInt(secondField);
}

/**
* 反序列化方法
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
firstField = dataInput.readUTF();
secondField = dataInput.readInt();
}
}

Mapper

定义一个 Mapper 类 CustomSortMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 一、Map 阶段
* 自定义 map 逻辑
*/
public class CustomSortMapper extends Mapper<LongWritable, Text, SortBean, IntWritable> {

private SortBean mapOutKey = new SortBean();
private IntWritable mapOutValue = new IntWritable();

/**
* 每接收一行数据就执行一次下面的 map 方法
*
* 接收数据:
* k1:行偏移量
* v1:a 1
*
* 处理为:
* k2:sortBean
* v2:1
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String lineStr = value.toString();
String[] wordArr = lineStr.split("\t");

//组合key和value为<(key,value), value>
mapOutKey.set(strs[0], Integer.valueOf(lineStr[1]));
mapOutValue.set(Integer.valueOf(lineStr[1]));
context.write(mapOutKey, mapOutValue);
}
}

Reducer

定义一个 Reducer 类 CustomSortReducer

1
2
3
4
5
6
7
8
9
10
11
/**
* 三、Reduce 阶段
* 自定义 reduce 逻辑
*/
public class CustomSortReducer extends Reducer<SortBean, IntWritable, Text, IntWritable> {

@Override
protected void reduce(SortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}

Main

定义一个主类 CustomSortJobMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 目的:对第一列按照字典顺序进行排序,第一列相同时,对第二列按照升序进行排序。
* 作用:
* 1. 整合 MapReduce 程序 3 阶段 8 步骤
* 2. 作为 MapReduce 程序的执行入口
*/
public class CustomSortJobMain extends Configured implements Tool {

/**
* 整合 MapReduce 程序 3 阶段 8 步骤
*/
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "CustomSort");
job.setJarByClass(CustomSortJobMain.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("E:\\sortdata\\input"));

job.setMapperClass(CustomSortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(CustomSortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("E:\\sortdata\\output"));
boolean b = job.waitForCompletion(true);

return b ? 0 : 1;
}

/**
* 作为 MapReduce 程序的执行入口
*/
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new CustomSortJobMain(), args);
System.exit(run);
}
}

运算结果

E:\sortdata\output\part-r-00000
1
2
3
4
5
6
7
8
a	1
a 5
a 7
a 9
a 9
b 3
b 8
b 10

计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务,更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

Hadoop 内置计数器
MapReduce任务计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

需求

以排序程序为例,统计 map 接收到的数据记录条数。

方式一

定义计数器的第一种方式,通过 context 上下文对象获取计数器,在 Map 端进行统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CustomSortMapper extends Mapper<LongWritable, Text, SortBean, Text> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


+ //自定义计数器(方式一),统计 Map 端数据的条数
+ Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
+ counter.increment(1L);

String lineStr = value.toString();
String[] wordArr = lineStr.split("\t");

String firstField = wordArr[0];
Integer secondField = Integer.parseInt(wordArr[1]);

SortBean sortBean = new SortBean(firstField, secondField);

context.write(sortBean, value);
}
}

控制台打印结果如下:

计数器一

方式二

通过 enum 枚举类型来定义计数器,统计 Reduce 端输入的 key 和 value 有多少个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CustomSortReducer extends Reducer<SortBean, Text, SortBean, NullWritable> {

//自定义计数器(方式二),统计 Reduce 端接收数据的 key 和 value 的个数
public static enum Counter{
REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS,
}

@Override
protected void reduce(SortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L);
for (Text value : values) {
context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L);
context.write(key, NullWritable.get());
}
}
}

控制台打印结果如下:

计数器二

规约

每一个 mapper 都可能会产生大量的本地输出,Combiner 的作用就是对 Map 端的输出先做一次合并,以减少在 Map 和 Reduce 之间的数据传输量,是 MapReduce 优化手段之一。

  • Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的又一种组件
  • Combiner 组件的父类是 Reducer
  • Combiner 和 Reducer 的区别在于运行的位置
  • Combiner 是在每一个 MapTask 所在的节点运行
  • Reducer 是接收全局所有 Mapper 的输出结果
  • Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量

实现步骤:

  1. 自定义一个 Combiner 继承 Reducer,重写 reduce() 方法
  2. 在 job 中设置 job.setCombinerClass(CustomCombiner.class)

Combiner 能够应用的前提是不影响最终的业务逻辑,而且 Combiner 的输出 (key, value) 应该跟 Reducer 的输入 (key, value) 类型对应。

数据流量分析

现有数据流量数据如下图所示,根据需求进行分析:

1580744722764

流量统计

需求

统计每个手机号的上行流量之和,下行流量之和,上行总流量之和,下行总流量之和。

解决思路

在 Map 阶段,以手机号码为 key,上行流量、下行流量、上行总流量、下行总流量作为 value,输出到 Reducer,在 Reduce 阶段进行统计求和。

自定义数据类型

定义一个 Writable 接口的实现类 DataFlowBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class DataFlowBean implements Writable {
private Integer upFlow;
private Integer downFlow;
private Integer upFlowCount;
private Integer downFlowCount;

public DataFlowBean() {
}

public DataFlowBean(Integer upFlow, Integer downFlow, Integer upFlowCount, Integer downFlowCount) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.upFlowCount = upFlowCount;
this.downFlowCount = downFlowCount;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upFlowCount);
dataOutput.writeInt(downFlowCount);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.upFlowCount = dataInput.readInt();
this.downFlowCount = dataInput.readInt();
}

public Integer getUpFlow() {
return upFlow;
}

public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}

public Integer getDownFlow() {
return downFlow;
}

public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}

public Integer getUpFlowCount() {
return upFlowCount;
}

public void setUpFlowCount(Integer upFlowCount) {
this.upFlowCount = upFlowCount;
}

public Integer getDownFlowCount() {
return downFlowCount;
}

public void setDownFlowCount(Integer downFlowCount) {
this.downFlowCount = downFlowCount;
}

@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + upFlowCount +
"\t" + downFlowCount;
}
}

Mapper

定义一个 Mapper 类 DataFlowMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class DataFlowMapper extends Mapper<LongWritable, Text, Text, DataFlowBean> {
DataFlowBean dataFlowBean = new DataFlowBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineStr = value.toString();
String[] lineArr = lineStr.split("\t");
dataFlowBean.setUpFlow(Integer.parseInt(lineArr[6]));
dataFlowBean.setDownFlow(Integer.parseInt(lineArr[7]));
dataFlowBean.setUpFlowCount(Integer.parseInt(lineArr[8]));
dataFlowBean.setDownFlowCount(Integer.parseInt(lineArr[9]));
context.write(new Text(lineArr[1]), dataFlowBean);
}
}

Reducer

定义一个 Reducer 类 DataFlowReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package moe.sannaha.dataflowstatistics;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class DataFlowReducer extends Reducer<Text, DataFlowBean, Text, DataFlowBean> {
DataFlowBean dataFlowBean = new DataFlowBean();

@Override
protected void reduce(Text key, Iterable<DataFlowBean> values, Context context) throws IOException, InterruptedException {
Integer upFlow = 0;
Integer downFlow = 0;
Integer upFlowCount = 0;
Integer downFlowCount = 0;
for (DataFlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upFlowCount += value.getUpFlowCount();
downFlowCount += value.getDownFlowCount();
}
dataFlowBean.setUpFlow(upFlow);
dataFlowBean.setDownFlow(downFlow);
dataFlowBean.setUpFlowCount(upFlowCount);
dataFlowBean.setDownFlowCount(downFlowCount);
context.write(key, dataFlowBean);
}
}

Main

定义一个主类 DataFlowMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DataFlowMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "DataFlow");
job.setJarByClass(DataFlowMain.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path("E:\\MapReduce\\流量统计\\data_flow.dat"));

job.setMapperClass(DataFlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataFlowBean.class);

job.setReducerClass(DataFlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataFlowBean.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("E:\\MapReduce\\流量统计\\output"));

boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new DataFlowMain(), args);
System.exit(run);
}
}

运算结果

E:\MapReduce\流量统计\output\part-r-00000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1348025████	3	3	180	180
1350246████ 57 102 7335 110349
1356043████ 33 24 2034 5892
1360021████ 18 138 1080 186852
1360284████ 15 12 1938 2910
1366057████ 24 9 6960 690
1371919████ 4 0 240 0
1372623████ 24 27 2481 24681
1376077████ 2 2 120 120
1382307████ 6 3 360 180
1382654████ 4 0 264 0
1392231████ 12 12 3008 3720
1392505████ 69 63 11058 48243
1392625████ 4 0 240 0
1392643████ 2 4 132 1512
1501368████ 28 27 3659 3538
1592013████ 20 20 3156 2936
1598900████ 3 3 1938 180
1821157████ 15 12 1527 2106
1832017████ 21 18 9531 2412
8413████ 20 16 4116 1432

流量分区排序

需求

  1. 以流量统计的输出数据作为输入数据。
  2. 根据手机号码的起始两位进行分区,分为“13”、“15”、“18”和其他。
  3. 根据下行总流量之和倒序排序。

解决思路

MapReduce 程序在 Shuffle 阶段根据 key 进行排序,因此 Map 阶段输出的 key 为封装了上行流量的 DataFlowBean,value 为手机号码。

自定义数据类型

定义一个 WritableComparable 类 DataFlowWritableComparable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class DataFlowWritableComparable implements WritableComparable<DataFlowWritableComparable> {
private Integer upFlow;
private Integer downFlow;
private Integer upFlowCount;
private Integer downFlowCount;

public DataFlowWritableComparable() {
}

public DataFlowWritableComparable(Integer upFlow, Integer downFlow, Integer upFlowCount, Integer downFlowCount) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.upFlowCount = upFlowCount;
this.downFlowCount = downFlowCount;
}

//根据下行总流量之和倒序排序
@Override
public int compareTo(DataFlowWritableComparable dataFlowWritableComparable) {
return this.upFlowCount > dataFlowWritableComparable.upFlowCount ? -1 : 1;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upFlowCount);
dataOutput.writeInt(downFlowCount);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.upFlowCount = dataInput.readInt();
this.downFlowCount = dataInput.readInt();
}

public Integer getUpFlow() {
return upFlow;
}

public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}

public Integer getDownFlow() {
return downFlow;
}

public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}

public Integer getUpFlowCount() {
return upFlowCount;
}

public void setUpFlowCount(Integer upFlowCount) {
this.upFlowCount = upFlowCount;
}

public Integer getDownFlowCount() {
return downFlowCount;
}

public void setDownFlowCount(Integer downFlowCount) {
this.downFlowCount = downFlowCount;
}

@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + upFlowCount +
"\t" + downFlowCount;
}
}

Mapper

定义一个 Mapper 类 DataFlowCompareMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DataFlowCompareMapper extends Mapper<LongWritable, Text, DataFlowWritableComparable, Text> {
DataFlowWritableComparable dataFlowWritableComparable = new DataFlowWritableComparable();
Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineStr = value.toString();
String[] lineArr = lineStr.split("\t");
dataFlowWritableComparable.setUpFlow(Integer.parseInt(lineArr[1]));
dataFlowWritableComparable.setDownFlow(Integer.parseInt(lineArr[2]));
dataFlowWritableComparable.setUpFlowCount(Integer.parseInt(lineArr[3]));
dataFlowWritableComparable.setDownFlowCount(Integer.parseInt(lineArr[4]));
outValue.set(lineArr[0]);
context.write(dataFlowWritableComparable, outValue);
}
}

Partitioner

定义一个 Partitioner 类 CustomPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DataFlowPartition extends Partitioner<DataFlowWritableComparable, Text> {
//根据手机号码的起始两位进行分区
@Override
public int getPartition(DataFlowWritableComparable dataFlowWritableComparable, Text text, int i) {
String lineStr = text.toString();
if (lineStr.startsWith("13")) {
return 0;
} else if (lineStr.startsWith("15")) {
return 1;
} else if (lineStr.startsWith("18")) {
return 2;
} else {
return 3;
}
}
}

Reducer

定义一个 Reducer 类DataFlowCompareReducer

1
2
3
4
5
6
7
public class DataFlowCompareReducer extends Reducer<DataFlowWritableComparable, Text, Text, DataFlowWritableComparable> {

@Override
protected void reduce(DataFlowWritableComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(values.iterator().next(), key);
}
}

Main

定义一个主类 DataFlowCompareMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DataFlowCompareMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "DataFlowCompare");
job.setJarByClass(DataFlowCompareMain.class);
job.setNumReduceTasks(4);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, new Path(args[0]));

job.setMapperClass(DataFlowCompareMapper.class);
job.setMapOutputKeyClass(DataFlowWritableComparable.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(DataFlowPartition.class);

job.setReducerClass(DataFlowCompareReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataFlowWritableComparable.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new DataFlowCompareMain(), args);
System.exit(run);
}
}

打包运行

该 MapReduce 程序涉及到多个分区,需要在集群环境运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# 上传流量统计结果到HDFS
# hdfs dfs -put part-r-00000 /test/dataFlow/

# hadoop jar dataflowanalysis-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://node01:8020/test/dataFlow/input/part-r-00000 hdfs://node01:8020/test/dataFlow/output
20/02/04 15:25:51 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.153.100:8032
20/02/04 15:25:54 INFO input.FileInputFormat: Total input paths to process : 1
20/02/04 15:25:54 INFO mapreduce.JobSubmitter: number of splits:1
20/02/04 15:25:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1580365594154_0006
20/02/04 15:25:55 INFO impl.YarnClientImpl: Submitted application application_1580365594154_0006
20/02/04 15:25:55 INFO mapreduce.Job: The url to track the job: http://node01:8088/proxy/application_1580365594154_0006/
20/02/04 15:25:55 INFO mapreduce.Job: Running job: job_1580365594154_0006
20/02/04 15:26:03 INFO mapreduce.Job: Job job_1580365594154_0006 running in uber mode : false
20/02/04 15:26:03 INFO mapreduce.Job: map 0% reduce 0%
20/02/04 15:26:08 INFO mapreduce.Job: map 100% reduce 0%
20/02/04 15:26:24 INFO mapreduce.Job: map 100% reduce 25%
20/02/04 15:26:25 INFO mapreduce.Job: map 100% reduce 50%
20/02/04 15:26:30 INFO mapreduce.Job: map 100% reduce 100%
20/02/04 15:26:30 INFO mapreduce.Job: Job job_1580365594154_0006 completed successfully
20/02/04 15:26:30 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=651
FILE: Number of bytes written=718779
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=672
HDFS: Number of bytes written=556
HDFS: Number of read operations=15
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=4
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3106
Total time spent by all reduces in occupied slots (ms)=59993
Total time spent by all map tasks (ms)=3106
Total time spent by all reduce tasks (ms)=59993
Total vcore-milliseconds taken by all map tasks=3106
Total vcore-milliseconds taken by all reduce tasks=59993
Total megabyte-milliseconds taken by all map tasks=3180544
Total megabyte-milliseconds taken by all reduce tasks=61432832
Map-Reduce Framework
Map input records=21
Map output records=21
Map output bytes=585
Map output materialized bytes=651
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=651
Reduce input records=21
Reduce output records=21
Spilled Records=42
Shuffled Maps =4
Failed Shuffles=0
Merged Map outputs=4
GC time elapsed (ms)=1009
CPU time spent (ms)=10430
Physical memory (bytes) snapshot=985886720
Virtual memory (bytes) snapshot=13803925504
Total committed heap usage (bytes)=708902912
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=556
File Output Format Counters
Bytes Written=556

# hdfs dfs -cat /test/dataFlow/output/part-r-00000
1360021████ 18 138 1080 186852
1350246████ 57 102 7335 110349
1392505████ 69 63 11058 48243
1372623████ 24 27 2481 24681
1356043████ 33 24 2034 5892
1392231████ 12 12 3008 3720
1360284████ 15 12 1938 2910
1392643████ 2 4 132 1512
1366057████ 24 9 6960 690
1382307████ 6 3 360 180
1348025████ 3 3 180 180
1376077████ 2 2 120 120
1371919████ 4 0 240 0
1382654████ 4 0 264 0
1392625████ 4 0 240 0

# hdfs dfs -cat /test/dataFlow/output/part-r-00001
1501368████ 28 27 3659 3538
1592013████ 20 20 3156 2936
1598900████ 3 3 1938 180

# hdfs dfs -cat /test/dataFlow/output/part-r-00002
1832017████ 21 18 9531 2412
1821157████ 15 12 1527 2106

# hdfs dfs -cat /test/dataFlow/output/part-r-00003
8413████ 20 16 4116 1432

参考资料
Hadoop 压缩格式 gzip/snappy/lzo/bzip2 比较与总结 作者:青牛