public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
//规定map中用到的数据类型,这里的Text相当于jdk中的String IntWritable相当于jdk的int类型,
//这样做的原因主要是为了hadoop的数据序化而做的。
private final static IntWritable one = new IntWritable(1);//声时一个IntWritable变量,作计数用,每出现一个key,给其一个value=1的值
private Text word = new Text();//用来暂存map输出中的key值,Text类型的
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//这就是map函数,它是和Mapper抽象类中的相对应的,此处的Object key,Text value的类型和上边的Object,Text是相对应的,
//而且最好一样,不然的话,多数情况运行时会报错。
StringTokenizer itr = new StringTokenizer(value.toString());
//Hadoop读入的value是以行为单位的,其key为该行所对应的行号,因为我们要计算每个单词的数目,默认以空格作为间隔,
//故用StringTokenizer辅助做字符串的拆分,也可以用string.split("")来作。
while (itr.hasMoreTokens()) { //遍历一下每行字符串中的单词
word.set(itr.nextToken()); //出现一个单词就给它设成一个key并将其值设为1
context.write(word, one); //输出设成的key/value值
//上面就是map打散的过程
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { //reduce的静态类,这里和Map中的作用是一样的,设定输入/输出的值的类型
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) { //由于map的打散,这里会得到如,{key,values}={"hello",{1,1,....}},这样的集合
sum += val.get(); //这里需要逐一将它们的value取出来予以相加,取得总的出现次数,即为汇和
}
result.set(sum); //将values的和取得,并设成result对应的值
context.write(key, result);
//此时的key即为map打散之后输出的key,没有变化,变化的时result,以前得到的是一个数字的集合,此时已经给算出和了,并做为key/value输出。
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //取得系统的参数
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) { //判断一下命令行输入路径/输出路径是否齐全,即是否为两个参数
System.err.println("Usage: wordcount <in> <out>");
System.exit(2); //若非两个参数,即退出
}
Job job = new Job(conf, "word count"); //此程序的执行,在hadoop看来是一个Job,故进行初始化job操作
job.setJarByClass(WordCount.class); //可以认为成,此程序要执行MyWordCount.class这个字节码文件
job.setMapperClass(TokenizerMapper.class); //在这个job中,我用TokenizerMapper这个类的map函数
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class); //在这个job中,我用IntSumReducer这个类的reduce函数
job.setOutputKeyClass(Text.class); //在reduce的输出时,key的输出类型为Text
job.setOutputValueClass(IntWritable.class); //在reduce的输出时,value的输出类型为IntWritable
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //初始化要计算word的文件的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //初始化要计算word的文件的之后的结果的输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);
//提交job到hadoop上去执行了,意思是指如果这个job真正的执行完了则主函数退出了,若没有真正的执行完就退出了。
}
}
hadoopor@master:~$ hadoop jar '/home/hadoopor/workspace/ddd/ddd_fat.jar' input output
15/01/20 09:55:20 INFO input.FileInputFormat: Total input paths to process : 1
15/01/20 09:55:20 INFO mapred.JobClient: Running job: job_201501200913_0002
15/01/20 09:55:21 INFO mapred.JobClient: map 0% reduce 0%
15/01/20 09:55:45 INFO mapred.JobClient: map 15% reduce 0%
15/01/20 09:55:48 INFO mapred.JobClient: map 28% reduce 0%
15/01/20 09:55:51 INFO mapred.JobClient: map 33% reduce 0%
15/01/20 09:56:06 INFO mapred.JobClient: map 54% reduce 0%
15/01/20 09:56:08 INFO mapred.JobClient: map 61% reduce 0%
15/01/20 09:56:11 INFO mapred.JobClient: map 63% reduce 0%
15/01/20 09:56:14 INFO mapred.JobClient: map 68% reduce 0%
15/01/20 09:56:17 INFO mapred.JobClient: map 79% reduce 0%
15/01/20 09:56:20 INFO mapred.JobClient: map 93% reduce 0%
15/01/20 09:56:23 INFO mapred.JobClient: map 100% reduce 0%
15/01/20 09:56:26 INFO mapred.JobClient: map 100% reduce 22%
15/01/20 10:05:21 INFO mapred.JobClient: Task Id : attempt_201501200913_0002_m_000002_0, Status : FAILED
Too many fetch-failures
15/01/20 10:05:21 WARN mapred.JobClient: Error reading task outputslaver1
15/01/20 10:05:21 WARN mapred.JobClient: Error reading task outputslaver1
15/01/20 10:05:25 INFO mapred.JobClient: map 83% reduce 22%
15/01/20 10:05:34 INFO mapred.JobClient: map 97% reduce 22%
15/01/20 10:05:37 INFO mapred.JobClient: map 100% reduce 22%
15/01/20 10:05:43 INFO mapred.JobClient: map 100% reduce 27%
15/01/20 10:13:36 INFO mapred.JobClient: Task Id : attempt_201501200913_0002_m_000003_0, Status : FAILED
Too many fetch-failures
15/01/20 10:13:36 WARN mapred.JobClient: Error reading task outputslaver1
15/01/20 10:13:36 WARN mapred.JobClient: Error reading task outputslaver1
15/01/20 10:13:40 INFO mapred.JobClient: map 83% reduce 27%
15/01/20 10:13:46 INFO mapred.JobClient: map 97% reduce 27%
15/01/20 10:13:49 INFO mapred.JobClient: map 100% reduce 27%
15/01/20 10:14:04 INFO mapred.JobClient: map 100% reduce 100%
15/01/20 10:14:09 INFO mapred.JobClient: Job complete: job_201501200913_0002
15/01/20 10:14:09 INFO mapred.JobClient: Counters: 25
15/01/20 10:14:09 INFO mapred.JobClient: Map-Reduce Framework
15/01/20 10:14:09 INFO mapred.JobClient: Combine output records=845
15/01/20 10:14:09 INFO mapred.JobClient: Spilled Records=923
15/01/20 10:14:09 INFO mapred.JobClient: Map output materialized bytes=2682
15/01/20 10:14:09 INFO mapred.JobClient: Reduce input records=78
15/01/20 10:14:09 INFO mapred.JobClient: Reduce output records=13
15/01/20 10:14:09 INFO mapred.JobClient: Map input records=14181040
15/01/20 10:14:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=678
15/01/20 10:14:09 INFO mapred.JobClient: Map output records=14181040
15/01/20 10:14:09 INFO mapred.JobClient: Map output bytes=457599200
15/01/20 10:14:09 INFO mapred.JobClient: Reduce shuffle bytes=2235
15/01/20 10:14:09 INFO mapred.JobClient: Combine input records=14181807
15/01/20 10:14:09 INFO mapred.JobClient: Reduce input groups=13
15/01/20 10:14:09 INFO mapred.JobClient: File Input Format Counters
15/01/20 10:14:09 INFO mapred.JobClient: Bytes Read=400895525
15/01/20 10:14:09 INFO mapred.JobClient: FileSystemCounters
15/01/20 10:14:09 INFO mapred.JobClient: HDFS_BYTES_READ=400896203
15/01/20 10:14:09 INFO mapred.JobClient: FILE_BYTES_WRITTEN=179215
15/01/20 10:14:09 INFO mapred.JobClient: FILE_BYTES_READ=29025
15/01/20 10:14:09 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=461
15/01/20 10:14:09 INFO mapred.JobClient: Job Counters
15/01/20 10:14:09 INFO mapred.JobClient: Launched map tasks=8
15/01/20 10:14:09 INFO mapred.JobClient: Launched reduce tasks=1
15/01/20 10:14:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=1091487
15/01/20 10:14:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/20 10:14:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=207494
15/01/20 10:14:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/01/20 10:14:09 INFO mapred.JobClient: Data-local map tasks=8
15/01/20 10:14:09 INFO mapred.JobClient: File Output Format Counters
15/01/20 10:14:09 INFO mapred.JobClient: Bytes Written=461