MapReduce 是一个分布式计算框架,用于大规模数据集的并行运算。”Map(映射)”和”Reduce(归约)”是它的主要思想,简单来说就是“分而治之”。
概述 MapReduce 是一个分布式运算程序的编程框架,极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
MapReduce设计构思 MapReduce构思体现在如下的三个方面:
如何处理大数据:分而治之 。对相互间不具有计算依赖关系的大数据,划分计算任务或者计算数据以进行并行计算 。不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算。
构建抽象模型 Map 和 Reduce:
Map:对一组数据元素进行某种重复式的处理。
Reduce:对Map的中间结果进行某种进一步的结果整理。
统一构架,隐藏系统层细节。有了 MapReduce 统一封装底层细节,那么程序员就不再需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码 。
举例说明,现需要统计图书室中书的数量。多个人统计不同书架上书的数量,这就是“Map”;将所有人的统计结果相加得到想要的结果,这就是“Reduce”。
MapReduce编程规范 MapReduce 程序的开发分为三个阶段八个步骤:
Map 阶段
读取文件,设置 inputFormat,将数据切分成键值对(k1,v1)
自定义 map 逻辑,接收 (k1,v1),转换成 (k2,v2)
Shuffle 阶段
分区,分区的数量与 reduce 个数对应;收到的 (k2,v2) 直接匹配到对应的分区
排序,对 (k2,v2) 进行排列
规约,在 map 端对数据做一次聚合,减少输出的 (k2,v2) 的数据量
分组,将 key 相同的 value 放到一个集合当中,下游 reduce 会主动来抓取,将对应自己分组的数据拿走
Reduce 阶段
自定义 reduce 逻辑,接收 (k2,v2),转换成 (k3,v3)
指定输出,设置 outputFormat,将 reduce 处理后的数据进行输出
MapReduce程序运行模式 本地运行模式
MapReduce 程序被提交给LocalJobRunner
在本地以单进程的形式运行
处理的数据和输出结果可以在本地文件系统,也可以在 HDFS 上
本地模式非常便于进行业务逻辑的 debug,可以在 IDEA 中打断点
实现本地运行,不要带集群的配置文件即可。本质是程序的 conf
中是否有以下参数:
1 2 mapreduce.framework.name =localyarn.resourcemanager.hostname =local
集群运行模式
将 MapReduce 程序提交给 YRAN 集群,分发到多个节点并行执行
处理的数据和输出结果应在 HDFS 上
实现集群运行,将程序打成 jar 包,然后在集群的任意一个节点上用以下命令启动:
1 yarn jar wordcount-1.0-SNAPSHOT.jar moe.sannaha.wordcount.WordCountJobMain
序列化与反序列化 序列化(Serialization):把结构化对象转化为字节流 。当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流。 反序列化(Deserialization):是序列化的逆过程,把字节流转为结构化对象 。当要将接收到或从磁盘读取的字节流转换为对象的时候,就要进行反序列化。
在 MapReduce 程序中,我们使用 Hadoop 提供的序列化机制(Writable)取代 Java 提供的(Serializable)。
Java 序列化是一个重量级序列化框架,没有做一定的压缩,会附带很多额外的信息(各种校验信息,header,继承体系等),占用存储空间大,不便于在网络中高效传输;反序列化每次都要重新创建对象,内存消耗大。
Hadoop 序列化(Writable)不用像 Java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销;反序列化可以重用对象,在已有对象上进行反序列化操作。
MapReduce运行机制 MapReduce 作业有以下 5 个独立的实体:
Client:提交 MapReduce 作业。
ResourceManager:协调集群上资源的分配。
NodeManager:启动和监视集群中机器上的计算容器(container)。
MapReduce 的 application master:协调运行 MapReduce 作业的任务。它和 MapReduce 任务在容器中运行,这些容器由 ResourceManager 分配并由 NodeManager 进行管理。
分布式文件系统:用来与其他实体间共享作业文件。
MapReduce工作流程 一个完整的 MapReduce 程序在分布式运行时有 3 类实例进程:
MRAppMaster:负责整个程序的过程调度及状态协调。
MapTask:负责 Map 阶段的整个数据处理流程。
ReduceTask:负责 Reduce 阶段的整个数据处理流程。
MapReduce 工作流程概述如下:
MapReduce 作业运行时最先启动 MRAppMaster,MRAppMaster 根据 job 的描述信息,计算出需要的 MapTask 实例数量,然后向集群申请机器启动相应数量的 MapTask 进程。
MapTask 进程启动之后,对给定的数据切片进行处理,主要流程为:
利用客户指定的 InputFormat
来获取 RecordReader
读取数据,形成输入 <key,value>
对。
调用用户定义的 map()
方法处理输入的 <key,value>
对,并将 map()
方法输出的 <key,value>
对收集到缓存。
将缓存中的 <key,value>
对按照 key
进行分区排序后溢写到磁盘文件。
MRAppMaster 监控到所有 MapTask 进程任务完成之后,会根据用户指定的参数启动相应数量的 ReduceTask 进程,并告知 ReduceTask 进程要处理的数据范围(数据分区)。
ReduceTask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从运行着 MapTask 的若干个节点上获取到若干个 MapTask 输出结果文件,并在本地进行重新归并排序,然后按照相同 key
的KV为一个组,调用用户定义的 reduce()
方法进行逻辑运算,并收集运算输出的结果 <key,value>
,然后调用客户指定的 OutputFormat
将结果数据输出到外部存储。
MapReduce 工作流程如上图所示,接下来分别对 MapTask 和 ReduceTask 展开详细说明。
MapTask工作流程
MapTask 流程:
Read 阶段。数据读取组件 InputFormat
(默认 TextInputFormat
)通过 getSplits()
方法对输入目录中的文件进行逻辑切片规划得到多个 split 文件,启动与 split 文件个数相同个 MapTask 进行处理。split 文件和 block 默认也是一对一。RecordReader
对象(默认LineRecordReader
)读取 split 文件,以 \n
作为分隔符读取一行数据,返回 <key,value>
。key
表示每行首字符偏移值,value
表示这一行文本内容。
Map 阶段。进入用户编写的 Mapper 类中,执行重写的 map()
方法处理数据。RecordReader
每读取一行,这里调用一次 map()
方法。
Collect 阶段。数据被 mapper 处理结束之后交给 OutputCollector
收集器,会对其进行分区处理,默认使用 HashPartitioner
。map 输出的结果会被写入内存(环形缓冲区,默认 100MB),每个 MapTask 都有缓冲区,它可以减少磁盘IO的影响。
Spill 阶段。当缓冲区的数据达到阈值(100MB x 0.8),溢写线程启动,锁定这 80MB 内存,对这部分数据的 key
做排序(如果设置过 Combiner,会将有相同分区号和 key 的数据进行排序)后溢写(spill)到磁盘。因为缓冲区还保留了内存空间(100MB x 0.2),溢写过程不影响继续将 map 结果写入内存。
Merge 阶段。所有数据处理结束后会对磁盘中溢写产生的临时文件做合并(merge),生成最终的文件,并为这个文件提供了一个索引文件,以记录每个 reduce 对应数据的偏移量。至此 Map 阶段结束,等待 ReduceTask 来拉取数据。
ReduceTask工作流程
ReduceTask 工作流程大致分为以下几个阶段:
Copy 阶段。Reduce 进程启动一些 Fetcher 线程,以 HTTP 方式从已经完成 MapTask 的 Map 端 Copy 文件,Copy 来的数据会先放入内存缓冲区,然后进行 merge 合并文件。
Merge 阶段。Merge 有三种形式:内存到内存(默认不启用)、内存到磁盘、磁盘到磁盘。在此过程中会启动两个 merge 线程,分别为 inMemoryMerger
(内存到磁盘) 和 onDiskMerger
(磁盘到磁盘)。当内存中的数据量到达一定阈值,就执行 inMemoryMerger
将内存缓冲区数据溢写到磁盘(如果设置过Combiner,也是会启用的),产生多个临时文件。inMemoryMerger
线程一直运行到没有 Map 端数据,然后执行 onDiskMerger
将磁盘中的临时文件合并生成最终的大文件。
Sort 阶段。把分散的文件合并成一个大的文件时,还会对合并的数据进行归并排序。
Reduce 阶段。进入用户编写的 Reducer 类中,执行重写的 reduce()
方法,每次调用会产生零个或多个 <key, value>
,最后把这些输出的键值对写入到 HDFS 文件中。
Shuffle过程优化 Map 阶段处理的数据如何传递给 Reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 Shuffle,范围从 map()
方法之后到 reduce()
方法之前,包括了 MapTask 的 Collect 阶段、Spill 阶段、Merge 阶段和 ReduceTask 流程的 Copy 阶段、Merge 阶段、Sort 阶段。
调整溢写过程 Shuffle 的缓冲区大小会影响到 MapReduce 程序的执行效率。缓冲区越大,磁盘 IO 的次数越少,执行速度就越快。缓冲区可以通过以下参数调整:
1 2 3 4 5 6 7 8 mapreduce.task.io.sort.mb mapreduce.task.io.sort.spill.percent mapreduce.task.io.sort.factor
使用规约 对溢写文件,可以使用 Combiner 对 Map 端的输出先做一次合并,以减少在 Map 和 Reduce 之间的数据传输量。使用 Combiner 的前提是不影响业务逻辑,比如业务逻辑为求和,则可以进行 Combiner,求平均值则不行。
数据压缩 在 Shuffle 阶段,从 Map 端输出的数据都要通过网络发送到 Reduce 端,这一过程中涉及到大量的网络 IO,如果数据能够进行压缩,那么数据的发送量就会少得多。
Hadoop 对于压缩格式的是透明识别,能够自动为我们将压缩的文件解压。目前在 Hadoop 中常用的几种压缩格式:lzo,gzip,snappy,bzip2,简单做一下对比。
Hadoop 常用的压缩算法
压缩格式
算法
扩展名
多文件
splitable
native
hadoop自带
gzip
deflate
.gz
否
否
是
是
bzip2
bzip2
.bz2
是
是
否
是
lzo
lzo
.lzo
否
是
是
否
snappy
snappy
.snappy
否
否
是
否
对应的Java类
压缩格式
对应的Java类
gzip
org.apache.hadoop.io.compress.GZipCodec
bzip2
org.apache.hadoop.io.compress.BZip2Codec
lzo
com.hadoop.compression.lzo.LzopCodec
Snappy
org.apache.hadoop.io.compress.SnappyCodec
压缩速率对比
压缩格式
压缩比
压缩速率
解压速率
gzip
13.4%
21 MB/s
118 MB/s
bzip2
13.2%
2.4MB/s
9.5MB/s
lzo
20.5%
135 MB/s
410 MB/s
snappy
22.2%
172 MB/s
409 MB/s
在代码中设置压缩 通过修改代码的方式来实现数据的压缩,使用灵活。
设置 Map 阶段的压缩:
1 2 3 Configuration configuration = new Configuration(); configuration.set("mapreduce.map.output.compress" ,"true" ); configuration.set("mapreduce.map.output.compress.codec" ,"org.apache.hadoop.io.compress.SnappyCodec" );
设置 Reduce 阶段的压缩:
1 2 3 configuration.set("mapreduce.output.fileoutputformat.compress" ,"true" ); configuration.set("mapreduce.output.fileoutputformat.compress.type" ,"RECORD" ); configuration.set("mapreduce.output.fileoutputformat.compress.codec" ,"org.apache.hadoop.io.compress.SnappyCodec" );
在配置中设置压缩 修改mapred-site.xml
配置文件,然后重启集群,对所有的 MapReduce 任务启用压缩。
配置 Map 阶段的压缩:
mapred-site.xml 1 2 3 4 5 6 7 8 <property > <name > mapreduce.map.output.compress</name > <value > true</value > </property > <property > <name > mapreduce.map.output.compress.codec</name > <value > org.apache.hadoop.io.compress.SnappyCodec</value > </property >
配置 Reduce 阶段的压缩:
mapred-site.xml 1 2 3 4 5 6 7 8 9 10 11 12 <property > <name > mapreduce.output.fileoutputformat.compress</name > <value > true</value > </property > <property > <name > mapreduce.output.fileoutputformat.compress.type</name > <value > RECORD</value > </property > <property > <name > mapreduce.output.fileoutputformat.compress.codec</name > <value > org.apache.hadoop.io.compress.SnappyCodec</value > </property >
编程实例 WordCount 需求 在给定的文本文件中统计输出每个单词出现的总次数。
测试数据 存储于HDFS的 wordcount.txt
:
wordcount.txt 1 2 3 Hello,MapReduce,Spark Hi,MapReduce,Flink Hello,World
项目依赖 pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > moe.sannaha</groupId > <artifactId > MapReduceDemo</artifactId > <version > 1.0-SNAPSHOT</version > <name > MapReduceDemo</name > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > </properties > <repositories > <repository > <id > cloudera</id > <url > https://repository.cloudera.com/artifactory/cloudera-repos/</url > </repository > </repositories > <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 3.0.0-cdh6.1.1</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.11</version > <scope > test</scope > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > <version > 1.7.25</version > </dependency > </dependencies > <build > <plugins > <plugin > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <artifactId > maven-assembly-plugin </artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > <archive > <manifest > <mainClass > moe.sannaha.mapreduce.wordcount.WordCountJobMain</mainClass > </manifest > </archive > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
Mapper 定义一个 Mapper 类 WrodCountMapper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package moe.sannaha.mapreduce.wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMapper extends Mapper <LongWritable , Text , Text , LongWritable > { private Text wordText = new Text(); private LongWritable count = new LongWritable(1 ); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { for (String word : value.toString().split("," )) { wordText.set(word); context.write(wordText, count); } } }
Reducer 定义一个 Reducer 类 WordCountReducer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package moe.sannaha.mapreduce.wordcount;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer <Text , LongWritable , Text , LongWritable > { private LongWritable wordCount = new LongWritable(); @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable value : values) { sum += value.get(); } wordCount.set(sum); context.write(key, wordCount); } }
Main 定义一个主类 WordCountJobMain
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package moe.sannaha.mapreduce.wordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class WordCountJobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "WordCount" ); job.setJarByClass(WordCountJobMain.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://cdh1:8020/data/wordcount/" )); TextOutputFormat.setOutputPath(job, new Path("hdfs://cdh1:8020/data/output" )); job.setNumReduceTasks(1 ); boolean b = job.waitForCompletion(true ); return b ? 0 : 1 ; } public static void main (String[] args) throws Exception { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new WordCountJobMain(), args); System.exit(run); } }
分区 在 MapReduce 中,会将同一个分区的数据发送到同一个 reduce 当中进行处理,因此分区数与 reduce 个数(输出文件个数)一致。
需求 根据大小分区统计彩票开奖结果,根据GAME_RESULT
字段的值进行判断,15 以下为“小”,15 及以上为“大”。
测试数据 ID USER_ID GAME_TYPE OPEN_TIME GAME_NUM GAME_RESULT GAME_RESULT_DESC RESULT_TYPE
partition.csv 1 2 3 4 5 6 7 8 9 10 1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双 2 0 1 2017-07-31 23:15:03 837256 14 4+7+3=14 大,双 3 0 1 2017-07-31 23:20:12 837257 17 6+9+2=17 大,单 4 0 1 2017-07-31 23:25:12 837258 22 5+8+9=22 大,双 5 0 1 2017-07-31 23:30:18 837259 1 1+0+0=1 小,单 6 0 2 2017-07-31 23:17:22 2170779 4 2+0+2=4 小,双 7 0 2 2017-07-31 23:20:49 2170780 12 1+2+9=12 小,双 8 0 2 2017-07-31 23:24:18 2170781 11 6+4+1=11 小,单 9 0 2 2017-07-31 23:27:43 2170782 20 5+7+8=20 大,双 10 0 2 2017-07-31 23:31:23 2170783 12 3+1+8=12 小,双
Mapper 定义一个 Mapper 类 CustomPartitionMapper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CustomPartitionMapper extends Mapper <LongWritable , Text , Text , NullWritable > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
Partitioner 定义一个 Partitioner 类 CustomPartitioner
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CustomPartitioner extends Partitioner <Text , NullWritable > { @Override public int getPartition (Text text, NullWritable nullWritable, int i) { String textStr = text.toString(); String[] fieldArr = textStr.split("\t" ); String gameResult = fieldArr[5 ]; if (Integer.parseInt(gameResult)< 15 ) { return 0 ; } else { return 1 ; } } }
Reducer 定义一个 Reducer 类 CustomPartitionReducer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class CustomPartitionReducer extends Reducer <Text , NullWritable , Text , NullWritable > { @Override protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
Main 定义一个主类 CustomPartitionJobMain
。注意添加 job.setNumReduceTasks(2)
设置 ReduceTask 的数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class CustomPartitionJobMain extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), "CustomPartition" ); job.setJarByClass(CustomPartitionJobMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(args[0 ])); job.setMapperClass(CustomPartitionMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setPartitionerClass(CustomPartitioner.class); job.setReducerClass(CustomPartitionReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(2 ); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(args[1 ])); boolean b = job.waitForCompletion(true ); return b ? 0 : 1 ; } public static void main (String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new CustomPartitionJobMain(), args); System.exit(run); } }
打包运行 该 MapReduce 程序涉及到多个分区,需要在集群环境运行。
配置pom.xml
:
pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.4.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <minimizeJar > true</minimizeJar > </configuration > </execution > </executions > </plugin > <plugin > <artifactId > maven-assembly-plugin</artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > <archive > <manifest > <mainClass > moe.sannaha.custompartition.CustomPartitionJobMain</mainClass > </manifest > </archive > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
使用maven工具打包得到custompartition-1.0-SNAPSHOT-jar-with-dependencies.jar
,上传到 HDFS 后运行该 jar 包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 20/02/03 11:52:11 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.153.100:8032 20/02/03 11:52:15 INFO input.FileInputFormat: Total input paths to process : 1 20/02/03 11:52:15 INFO mapreduce.JobSubmitter: number of splits:1 20/02/03 11:52:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1580365594154_0002 20/02/03 11:52:15 INFO impl.YarnClientImpl: Submitted application application_1580365594154_0002 20/02/03 11:52:15 INFO mapreduce.Job: The url to track the job: http://node01:8088/proxy/application_1580365594154_0002/ 20/02/03 11:52:15 INFO mapreduce.Job: Running job: job_1580365594154_0002 20/02/03 11:52:36 INFO mapreduce.Job: Job job_1580365594154_0002 running in uber mode : false 20/02/03 11:52:36 INFO mapreduce.Job: map 0% reduce 0% 20/02/03 11:52:56 INFO mapreduce.Job: map 100% reduce 0% 20/02/03 11:53:09 INFO mapreduce.Job: map 100% reduce 50% 20/02/03 11:53:19 INFO mapreduce.Job: map 100% reduce 100% 20/02/03 11:53:20 INFO mapreduce.Job: Job job_1580365594154_0002 completed successfully 20/02/03 11:53:20 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read =1240988 FILE: Number of bytes written=2912243 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read =1210657 HDFS: Number of bytes written=1210550 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=2 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=17899 Total time spent by all reduces in occupied slots (ms)=27011 Total time spent by all map tasks (ms)=17899 Total time spent by all reduce tasks (ms)=27011 Total vcore-milliseconds taken by all map tasks=17899 Total vcore-milliseconds taken by all reduce tasks=27011 Total megabyte-milliseconds taken by all map tasks=18328576 Total megabyte-milliseconds taken by all reduce tasks=27659264 Map-Reduce Framework Map input records=15213 Map output records=15213 Map output bytes=1210550 Map output materialized bytes=1240988 Input split bytes=107 Combine input records=0 Combine output records=0 Reduce input groups=15213 Reduce shuffle bytes=1240988 Reduce input records=15213 Reduce output records=15213 Spilled Records=30426 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=9580 CPU time spent (ms)=21870 Physical memory (bytes) snapshot=767721472 Virtual memory (bytes) snapshot=8261402624 Total committed heap usage (bytes)=588316672 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1210550 File Output Format Counters Bytes Written=1210550
查看输出目录,有part-r-00000
和part-r-00001
两个结果文件:
1 2 3 4 5 Found 3 items -rw-r--r-- 3 root supergroup 0 2020-02-03 11:53 /test /data/output/_SUCCESS -rw-r--r-- 3 root supergroup 700807 2020-02-03 11:53 /test /data/output/part-r-00000 -rw-r--r-- 3 root supergroup 509743 2020-02-03 11:53 /test /data/output/part-r-00001
和CustomPartitioner
代码中的配置一致,结果为“小”的结果打上逻辑标识“0”,保存在part-r-00000
文件;结果为“大”的结果打上逻辑标识“1”,保存在part-r-00001
文件。
排序 Hadoop 的序列化机制是 Writable,Hadoop 定义了一个 Writable
接口,一个类只需实现这个接口即可支持可序列化。Writable
有一个子接口 WritableComparable
,它既可实现序列化,也可以通过compareTo()
方法对 key 进行比较。
1 2 3 4 5 6 7 8 9 o1.compareTo(o2);
需求 有两列数据,对第一列按照字典顺序进行排序,第一列相同时,对第二列按照升序进行排序。
测试数据 1 2 3 4 5 6 7 8 a 1 a 9 b 3 a 7 b 8 b 10 a 5 a 9
解决思路 mapper 输出 <key, value>
,将 key 和 value 组合成一个新 key(称为 newKey),value 保持不变,得到<(key,value), value>
,对其根据 newKey 排序,如果 newKey 相同,再对 value 进行排序。
自定义数据类型及比较器 定义一个 WritableComparable 类 SortBean
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class SortBean implements WritableComparable <SortBean > { private String firstField; private Integer secondField; public SortBean () { } public SortBean (String firstField, int secondField) { this .firstField = firstField; this .secondField = secondField; } @Override public String toString () { return firstField + "\t" + secondField; } public String getFirstField () { return firstField; } public void setFirstField (String firstField) { this .firstField = firstField; } public int getSecondField () { return secondField; } public void setSecondField (int secondField) { this .secondField = secondField; } @Override public int compareTo (SortBean o) { int compare = this .firstField.compareTo(o.getFirstField()); if (compare == 0 ) { compare = this .secondField.compareTo(o.getSecondField()); } return compare; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeUTF(firstField); dataOutput.writeInt(secondField); } @Override public void readFields (DataInput dataInput) throws IOException { firstField = dataInput.readUTF(); secondField = dataInput.readInt(); } }
Mapper 定义一个 Mapper 类 CustomSortMapper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class CustomSortMapper extends Mapper <LongWritable , Text , SortBean , IntWritable > { private SortBean mapOutKey = new SortBean(); private IntWritable mapOutValue = new IntWritable(); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineStr = value.toString(); String[] wordArr = lineStr.split("\t" ); mapOutKey.set(strs[0 ], Integer.valueOf(lineStr[1 ])); mapOutValue.set(Integer.valueOf(lineStr[1 ])); context.write(mapOutKey, mapOutValue); } }
Reducer 定义一个 Reducer 类 CustomSortReducer
:
1 2 3 4 5 6 7 8 9 10 11 public class CustomSortReducer extends Reducer <SortBean , IntWritable , Text , IntWritable > { @Override protected void reduce (SortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
Main 定义一个主类 CustomSortJobMain
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class CustomSortJobMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "CustomSort" ); job.setJarByClass(CustomSortJobMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("E:\\sortdata\\input" )); job.setMapperClass(CustomSortMapper.class); job.setMapOutputKeyClass(SortBean.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(CustomSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("E:\\sortdata\\output" )); boolean b = job.waitForCompletion(true ); return b ? 0 : 1 ; } public static void main (String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new CustomSortJobMain(), args); System.exit(run); } }
运算结果 E:\sortdata\output\part-r-00000 1 2 3 4 5 6 7 8 a 1 a 5 a 7 a 9 a 9 b 3 b 8 b 10
计数器 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务,更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
Hadoop 内置计数器
MapReduce任务计数器
org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器
org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器
org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器
org.apache.hadoop.mapreduce.JobCounter
需求 以排序程序为例,统计 map 接收到的数据记录条数。
方式一 定义计数器的第一种方式,通过 context
上下文对象获取计数器,在 Map 端进行统计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CustomSortMapper extends Mapper<LongWritable , Text , SortBean , Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException , InterruptedException { + + Counter counter = context.getCounter("MR_COUNT" , "MapRecordCounter" ); + counter.increment(1 L); String lineStr = value.toString(); String [] wordArr = lineStr.split("\t" ); String firstField = wordArr[0 ]; Integer secondField = Integer .parseInt(wordArr[1 ]); SortBean sortBean = new SortBean (firstField, secondField); context.write(sortBean, value); } }
控制台打印结果如下:
方式二 通过 enum
枚举类型来定义计数器,统计 Reduce 端输入的 key 和 value 有多少个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class CustomSortReducer extends Reducer <SortBean , Text , SortBean , NullWritable > { public static enum Counter { REDUCE_INPUT_RECORDS, REDUCE_INPUT_VAL_NUMS, } @Override protected void reduce (SortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.getCounter(Counter.REDUCE_INPUT_RECORDS).increment(1L ); for (Text value : values) { context.getCounter(Counter.REDUCE_INPUT_VAL_NUMS).increment(1L ); context.write(key, NullWritable.get()); } } }
控制台打印结果如下:
规约 每一个 mapper 都可能会产生大量的本地输出,Combiner 的作用就是对 Map 端的输出先做一次合并,以减少在 Map 和 Reduce 之间的数据传输量,是 MapReduce 优化手段之一。
Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的又一种组件
Combiner 组件的父类是 Reducer
Combiner 和 Reducer 的区别在于运行的位置
Combiner 是在每一个 MapTask 所在的节点运行
Reducer 是接收全局所有 Mapper 的输出结果
Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量
实现步骤:
自定义一个 Combiner
继承 Reducer
,重写 reduce()
方法
在 job 中设置 job.setCombinerClass(CustomCombiner.class)
Combiner 能够应用的前提是不影响最终的业务逻辑,而且 Combiner 的输出 (key, value)
应该跟 Reducer 的输入 (key, value)
类型对应。
数据流量分析 现有数据流量数据如下图所示,根据需求进行分析:
流量统计 需求 统计每个手机号的上行流量之和,下行流量之和,上行总流量之和,下行总流量之和。
解决思路 在 Map 阶段,以手机号码为 key,上行流量、下行流量、上行总流量、下行总流量作为 value,输出到 Reducer,在 Reduce 阶段进行统计求和。
自定义数据类型 定义一个 Writable 接口的实现类 DataFlowBean
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class DataFlowBean implements Writable { private Integer upFlow; private Integer downFlow; private Integer upFlowCount; private Integer downFlowCount; public DataFlowBean () { } public DataFlowBean (Integer upFlow, Integer downFlow, Integer upFlowCount, Integer downFlowCount) { this .upFlow = upFlow; this .downFlow = downFlow; this .upFlowCount = upFlowCount; this .downFlowCount = downFlowCount; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeInt(upFlow); dataOutput.writeInt(downFlow); dataOutput.writeInt(upFlowCount); dataOutput.writeInt(downFlowCount); } @Override public void readFields (DataInput dataInput) throws IOException { this .upFlow = dataInput.readInt(); this .downFlow = dataInput.readInt(); this .upFlowCount = dataInput.readInt(); this .downFlowCount = dataInput.readInt(); } public Integer getUpFlow () { return upFlow; } public void setUpFlow (Integer upFlow) { this .upFlow = upFlow; } public Integer getDownFlow () { return downFlow; } public void setDownFlow (Integer downFlow) { this .downFlow = downFlow; } public Integer getUpFlowCount () { return upFlowCount; } public void setUpFlowCount (Integer upFlowCount) { this .upFlowCount = upFlowCount; } public Integer getDownFlowCount () { return downFlowCount; } public void setDownFlowCount (Integer downFlowCount) { this .downFlowCount = downFlowCount; } @Override public String toString () { return upFlow + "\t" + downFlow + "\t" + upFlowCount + "\t" + downFlowCount; } }
Mapper 定义一个 Mapper 类 DataFlowMapper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class DataFlowMapper extends Mapper <LongWritable , Text , Text , DataFlowBean > { DataFlowBean dataFlowBean = new DataFlowBean(); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineStr = value.toString(); String[] lineArr = lineStr.split("\t" ); dataFlowBean.setUpFlow(Integer.parseInt(lineArr[6 ])); dataFlowBean.setDownFlow(Integer.parseInt(lineArr[7 ])); dataFlowBean.setUpFlowCount(Integer.parseInt(lineArr[8 ])); dataFlowBean.setDownFlowCount(Integer.parseInt(lineArr[9 ])); context.write(new Text(lineArr[1 ]), dataFlowBean); } }
Reducer 定义一个 Reducer 类 DataFlowReducer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package moe.sannaha.dataflowstatistics;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class DataFlowReducer extends Reducer <Text , DataFlowBean , Text , DataFlowBean > { DataFlowBean dataFlowBean = new DataFlowBean(); @Override protected void reduce (Text key, Iterable<DataFlowBean> values, Context context) throws IOException, InterruptedException { Integer upFlow = 0 ; Integer downFlow = 0 ; Integer upFlowCount = 0 ; Integer downFlowCount = 0 ; for (DataFlowBean value : values) { upFlow += value.getUpFlow(); downFlow += value.getDownFlow(); upFlowCount += value.getUpFlowCount(); downFlowCount += value.getDownFlowCount(); } dataFlowBean.setUpFlow(upFlow); dataFlowBean.setDownFlow(downFlow); dataFlowBean.setUpFlowCount(upFlowCount); dataFlowBean.setDownFlowCount(downFlowCount); context.write(key, dataFlowBean); } }
Main 定义一个主类 DataFlowMain
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class DataFlowMain extends Configured implements Tool { @Override public int run (String[] strings) throws Exception { Job job = Job.getInstance(super .getConf(), "DataFlow" ); job.setJarByClass(DataFlowMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path("E:\\MapReduce\\流量统计\\data_flow.dat" )); job.setMapperClass(DataFlowMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataFlowBean.class); job.setReducerClass(DataFlowReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataFlowBean.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("E:\\MapReduce\\流量统计\\output" )); boolean b = job.waitForCompletion(true ); return b ? 0 : 1 ; } public static void main (String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new DataFlowMain(), args); System.exit(run); } }
运算结果 E:\MapReduce\流量统计\output\part-r-00000 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 1348025████ 3 3 180 180 1350246████ 57 102 7335 110349 1356043████ 33 24 2034 5892 1360021████ 18 138 1080 186852 1360284████ 15 12 1938 2910 1366057████ 24 9 6960 690 1371919████ 4 0 240 0 1372623████ 24 27 2481 24681 1376077████ 2 2 120 120 1382307████ 6 3 360 180 1382654████ 4 0 264 0 1392231████ 12 12 3008 3720 1392505████ 69 63 11058 48243 1392625████ 4 0 240 0 1392643████ 2 4 132 1512 1501368████ 28 27 3659 3538 1592013████ 20 20 3156 2936 1598900████ 3 3 1938 180 1821157████ 15 12 1527 2106 1832017████ 21 18 9531 2412 8413████ 20 16 4116 1432
流量分区排序 需求
以流量统计的输出数据作为输入数据。
根据手机号码的起始两位进行分区,分为“13”、“15”、“18”和其他。
根据下行总流量之和倒序排序。
解决思路 MapReduce 程序在 Shuffle 阶段根据 key 进行排序,因此 Map 阶段输出的 key 为封装了上行流量的 DataFlowBean,value 为手机号码。
自定义数据类型 定义一个 WritableComparable 类 DataFlowWritableComparable
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class DataFlowWritableComparable implements WritableComparable <DataFlowWritableComparable > { private Integer upFlow; private Integer downFlow; private Integer upFlowCount; private Integer downFlowCount; public DataFlowWritableComparable () { } public DataFlowWritableComparable (Integer upFlow, Integer downFlow, Integer upFlowCount, Integer downFlowCount) { this .upFlow = upFlow; this .downFlow = downFlow; this .upFlowCount = upFlowCount; this .downFlowCount = downFlowCount; } @Override public int compareTo (DataFlowWritableComparable dataFlowWritableComparable) { return this .upFlowCount > dataFlowWritableComparable.upFlowCount ? -1 : 1 ; } @Override public void write (DataOutput dataOutput) throws IOException { dataOutput.writeInt(upFlow); dataOutput.writeInt(downFlow); dataOutput.writeInt(upFlowCount); dataOutput.writeInt(downFlowCount); } @Override public void readFields (DataInput dataInput) throws IOException { this .upFlow = dataInput.readInt(); this .downFlow = dataInput.readInt(); this .upFlowCount = dataInput.readInt(); this .downFlowCount = dataInput.readInt(); } public Integer getUpFlow () { return upFlow; } public void setUpFlow (Integer upFlow) { this .upFlow = upFlow; } public Integer getDownFlow () { return downFlow; } public void setDownFlow (Integer downFlow) { this .downFlow = downFlow; } public Integer getUpFlowCount () { return upFlowCount; } public void setUpFlowCount (Integer upFlowCount) { this .upFlowCount = upFlowCount; } public Integer getDownFlowCount () { return downFlowCount; } public void setDownFlowCount (Integer downFlowCount) { this .downFlowCount = downFlowCount; } @Override public String toString () { return upFlow + "\t" + downFlow + "\t" + upFlowCount + "\t" + downFlowCount; } }
Mapper 定义一个 Mapper 类 DataFlowCompareMapper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class DataFlowCompareMapper extends Mapper <LongWritable , Text , DataFlowWritableComparable , Text > { DataFlowWritableComparable dataFlowWritableComparable = new DataFlowWritableComparable(); Text outValue = new Text(); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineStr = value.toString(); String[] lineArr = lineStr.split("\t" ); dataFlowWritableComparable.setUpFlow(Integer.parseInt(lineArr[1 ])); dataFlowWritableComparable.setDownFlow(Integer.parseInt(lineArr[2 ])); dataFlowWritableComparable.setUpFlowCount(Integer.parseInt(lineArr[3 ])); dataFlowWritableComparable.setDownFlowCount(Integer.parseInt(lineArr[4 ])); outValue.set(lineArr[0 ]); context.write(dataFlowWritableComparable, outValue); } }
Partitioner 定义一个 Partitioner 类 CustomPartitioner
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class DataFlowPartition extends Partitioner <DataFlowWritableComparable , Text > { @Override public int getPartition (DataFlowWritableComparable dataFlowWritableComparable, Text text, int i) { String lineStr = text.toString(); if (lineStr.startsWith("13" )) { return 0 ; } else if (lineStr.startsWith("15" )) { return 1 ; } else if (lineStr.startsWith("18" )) { return 2 ; } else { return 3 ; } } }
Reducer 定义一个 Reducer 类DataFlowCompareReducer
:
1 2 3 4 5 6 7 public class DataFlowCompareReducer extends Reducer <DataFlowWritableComparable , Text , Text , DataFlowWritableComparable > { @Override protected void reduce (DataFlowWritableComparable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } }
Main 定义一个主类 DataFlowCompareMain
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class DataFlowCompareMain extends Configured implements Tool { @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(super .getConf(), "DataFlowCompare" ); job.setJarByClass(DataFlowCompareMain.class); job.setNumReduceTasks(4 ); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.setInputPaths(job, new Path(args[0 ])); job.setMapperClass(DataFlowCompareMapper.class); job.setMapOutputKeyClass(DataFlowWritableComparable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(DataFlowPartition.class); job.setReducerClass(DataFlowCompareReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataFlowWritableComparable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(args[1 ])); boolean b = job.waitForCompletion(true ); return b ? 0 : 1 ; } public static void main (String[] args) throws Exception { int run = ToolRunner.run(new Configuration(), new DataFlowCompareMain(), args); System.exit(run); } }
打包运行 该 MapReduce 程序涉及到多个分区,需要在集群环境运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 20/02/04 15:25:51 INFO client.RMProxy: Connecting to ResourceManager at node01/192.168.153.100:8032 20/02/04 15:25:54 INFO input.FileInputFormat: Total input paths to process : 1 20/02/04 15:25:54 INFO mapreduce.JobSubmitter: number of splits:1 20/02/04 15:25:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1580365594154_0006 20/02/04 15:25:55 INFO impl.YarnClientImpl: Submitted application application_1580365594154_0006 20/02/04 15:25:55 INFO mapreduce.Job: The url to track the job: http://node01:8088/proxy/application_1580365594154_0006/ 20/02/04 15:25:55 INFO mapreduce.Job: Running job: job_1580365594154_0006 20/02/04 15:26:03 INFO mapreduce.Job: Job job_1580365594154_0006 running in uber mode : false 20/02/04 15:26:03 INFO mapreduce.Job: map 0% reduce 0% 20/02/04 15:26:08 INFO mapreduce.Job: map 100% reduce 0% 20/02/04 15:26:24 INFO mapreduce.Job: map 100% reduce 25% 20/02/04 15:26:25 INFO mapreduce.Job: map 100% reduce 50% 20/02/04 15:26:30 INFO mapreduce.Job: map 100% reduce 100% 20/02/04 15:26:30 INFO mapreduce.Job: Job job_1580365594154_0006 completed successfully 20/02/04 15:26:30 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read =651 FILE: Number of bytes written=718779 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read =672 HDFS: Number of bytes written=556 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=8 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3106 Total time spent by all reduces in occupied slots (ms)=59993 Total time spent by all map tasks (ms)=3106 Total time spent by all reduce tasks (ms)=59993 Total vcore-milliseconds taken by all map tasks=3106 Total vcore-milliseconds taken by all reduce tasks=59993 Total megabyte-milliseconds taken by all map tasks=3180544 Total megabyte-milliseconds taken by all reduce tasks=61432832 Map-Reduce Framework Map input records=21 Map output records=21 Map output bytes=585 Map output materialized bytes=651 Input split bytes=116 Combine input records=0 Combine output records=0 Reduce input groups=21 Reduce shuffle bytes=651 Reduce input records=21 Reduce output records=21 Spilled Records=42 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=1009 CPU time spent (ms)=10430 Physical memory (bytes) snapshot=985886720 Virtual memory (bytes) snapshot=13803925504 Total committed heap usage (bytes)=708902912 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=556 File Output Format Counters Bytes Written=556 1360021████ 18 138 1080 186852 1350246████ 57 102 7335 110349 1392505████ 69 63 11058 48243 1372623████ 24 27 2481 24681 1356043████ 33 24 2034 5892 1392231████ 12 12 3008 3720 1360284████ 15 12 1938 2910 1392643████ 2 4 132 1512 1366057████ 24 9 6960 690 1382307████ 6 3 360 180 1348025████ 3 3 180 180 1376077████ 2 2 120 120 1371919████ 4 0 240 0 1382654████ 4 0 264 0 1392625████ 4 0 240 0 1501368████ 28 27 3659 3538 1592013████ 20 20 3156 2936 1598900████ 3 3 1938 180 1832017████ 21 18 9531 2412 1821157████ 15 12 1527 2106 8413████ 20 16 4116 1432
参考资料 Hadoop 压缩格式 gzip/snappy/lzo/bzip2 比较与总结 作者:青牛