Hadoop最基本的当然是得会写MapReduce,而MapReduce中最基本的当然又数WordCount,这篇对WordCount进行详细的讲解。 详细执行步骤如下:

  1. 将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。 分割过程

  2. 将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图所示。 执行map方法

  3. 得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图所示。 Map端排序及Combine过程

  4. Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图所示。Combine的输出还是分片数据,输入到reduce函数之前会 Reduce端排序及输出结果

附上个人注解的源代码,乱七八糟,基本上只是给自己看的

/**
 * Created by Gao on 15/7/14.
 */
public class WordCount {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//初始化输出的value
        private Text word = new Text();//初始化输出的key

        //将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,
        // 并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,
        // 其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            //将每一行的的输入转换为字符串,在令牌化,
            while (itr.hasMoreTokens()) {
                //只有public方法在对象生成时可以调用,对于private变量和方法,只能通过public方法或变量去调用
                //这里的hasMoreTokens()就是个public方法,返回值是个boolean型
                //循环是为了看每个单词后面是是否还有单词

                //nextToken()返回值是String型
                word.set(itr.nextToken());
                //Context类导入了MapContext接口,MapContext接口继承了TaskInputOutputContext接口
                //TaskInputOutputContext接口具有write方法
                //可是write()并没有被重写,接口里只是跑出了异常,所以,他的作用到底是啥?
                //回答上面context.write()方法在从Mapper继承来的map函数中被重写了
                context.write(word, one);
                //one就是具体的数值,比如1

                //虽然输出是(word,1)的形式,但其实Mapper会对其进行汇总,
                // 执行combine过程,将key值相同的value累加
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
        //Reducer先对从Mapper接收的数据进行排序,
        // 再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            //values其实是个可迭代的数组列表,汇聚了对应key的所有value值,基本都是1啦
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        //设置Jar类
        job.setJarByClass(WordCount.class);
        //设置mapper,Combiner,Reducer类
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        //设置输出的key/value类型类
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入路径,可以是多路径输入
        //FileInputFormat类用来生成可供map处理的<key,value>
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        //设置输出路径,单输出路径
        //每一种输入格式都有一种输出格式与其对应。默认的输出格式是TextOutputFormat,
        // 这种输出方式与输入类似,会将每条记录以一行的形式存入文本文件。
        // 不过,它的键和值可以是任意形式的,因为程序内容会调用toString()方法将键和值转换为String类型再输出。
        FileOutputFormat.setOutputPath(job,
                new Path(otherArgs[otherArgs.length - 1]));
        //退出Job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}