【正文】
其輸入為key1,list(value2)對,輸出為key1,value3對,key1不變,value3為IntWritable類型,值為key1值相同的所有value2的和。在main函數(shù)中,需要設(shè)置一系列的類,詳細內(nèi)容參考源碼(7)MultipleOutputFormat類用于向文件輸出結(jié)果(8)LineRecordWriter類被MultipleOutputFormat中的方法調(diào)用,向文件輸出一個結(jié)果key,value對 4. 程序的運行過程(1)程序從文件中讀出數(shù)據(jù)到內(nèi)存,生成matrix實例,通過組合左矩陣的行與右矩陣的列生成ac個matrixInputSplit(2)一個Mapper任務(wù)對一個matrixInputSplit中的每個key1,value1對調(diào)用一次Map方法對value1中的兩個整數(shù)相乘。 }}3. 程序中的類(1)matrix類用于存儲矩陣(2)IntPair類實現(xiàn)WritableComparable接口用于存儲整數(shù)對(3)matrixInputSplit類繼承了InputSplit接口,每個matrixInputSplit包括b個key,value對,用來生成一個積矩陣元素。 (job, new Path(output))。 ()。 ()。 ()。 Job job = new Job(conf, word count11)。 (key, result)。 for (IntWritable val : values) { sum += ()。 } } public static class IntSumReducer extends ReducerText, IntWritable, Text, IntWritable { private IntWritable result = new IntWritable()。 j ++。 IntWritable one = new IntWritable(result)。 // 向量序號 (i + )。 // 向量值 public void map(Object key, Text value, Context context) hrows IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(())。 // 矩陣行序號 private static int i = 0。 ((true) ? 0 : 1)。 (job, new Path(otherArgs[0]))。 ()。 ()。 ()。 (2)。//TOP N 定義 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()。 } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration()。ilen。 keys[i+1] = tempt。 keys[i] = keys[i+1]。 Text tempt = keys[i]。ilen1。 i++。 for(Iterator k:()) { keys[i] = (Text)()。 int i = 0。 } } } public void cleanup() { //收尾工作 排序并輸出TOP N Text[] keys = new Text[3]。 if(() ()) { (())。 if( len) (key,result)。 for (IntWritable val : values) { sum += ()。 public void setup(Context text) throws IOException,InterruptedException { len = ().getInt(N,3)。 int len。 (key, result)。 for (IntWritable val : values) { sum += ()。 } } } public static class IntSumCombiner extends ReducerText,IntWritable,Text,IntWritable { private IntWritable result = new IntWritable()。 while (()) { (())。 private Text word = new Text()。import 。import 。import 。im