WordCount处理过程详解

Hadoop最基本的当然是得会写MapReduce,而MapReduce中最基本的当然又数WordCount,这篇对WordCount进行详细的讲解。
详细执行步骤如下:

  1. 将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成对,如图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
    分割过程

  2. 将分割好的对交给用户定义的map方法进行处理,生成新的对,如图所示。
    执行map方法

  3. 得到map方法输出的对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图所示。
    Map端排序及Combine过程

  4. Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的对,并作为WordCount的输出结果,如图所示。Combine的输出还是分片数据,输入到reduce函数之前会
    Reduce端排序及输出结果

附上个人注解的源代码,乱七八糟,基本上只是给自己看的

/**
* Created by Gao on 15/7/14.
*/
public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);//初始化输出的value
private Text word = new Text();//初始化输出的key

//将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,
// 并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,
// 其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//将每一行的的输入转换为字符串,在令牌化,
while (itr.hasMoreTokens()) {
//只有public方法在对象生成时可以调用,对于private变量和方法,只能通过public方法或变量去调用
//这里的hasMoreTokens()就是个public方法,返回值是个boolean型
//循环是为了看每个单词后面是是否还有单词

//nextToken()返回值是String型
word.set(itr.nextToken());
//Context类导入了MapContext接口,MapContext接口继承了TaskInputOutputContext接口
//TaskInputOutputContext接口具有write方法
//可是write()并没有被重写,接口里只是跑出了异常,所以,他的作用到底是啥?
//回答上面context.write()方法在从Mapper继承来的map函数中被重写了
context.write(word, one);
//one就是具体的数值,比如1

//虽然输出是(word,1)的形式,但其实Mapper会对其进行汇总,
// 执行combine过程,将key值相同的value累加
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//Reducer先对从Mapper接收的数据进行排序,
// 再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
//values其实是个可迭代的数组列表,汇聚了对应key的所有value值,基本都是1啦
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
//设置Jar类
job.setJarByClass(WordCount.class);
//设置mapper,Combiner,Reducer类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//设置输出的key/value类型类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径,可以是多路径输入
//FileInputFormat类用来生成可供map处理的<key,value>
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
//设置输出路径,单输出路径
//每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,
// 这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。
// 不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String类型再输出。
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
//退出Job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

,