2、數(shù)據(jù)排序
“數(shù)據(jù)排序”是許多實際任務執(zhí)行時要完成的第一項工作,比如學生成績評比、數(shù)據(jù)建立索引等。這個實例和數(shù)據(jù)去重類似,都是先對原始數(shù)據(jù)進行初步處理,為進一步的數(shù)據(jù)操作打好基礎。下面進入這個示例。
2.1 實例描述
對輸入文件中數(shù)據(jù)進行排序。輸入文件中的每行內容均為一個數(shù)字,即一個數(shù)據(jù)。要求在輸出中每行有兩個間隔的數(shù)字,其中,第一個代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個代表原始數(shù)據(jù)。
樣例輸入:
1)file1:
2
32
654
32
15
756
65223
2)file2:
5956
22
650
92
3)file3:
26
54
6
樣例輸出:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
2.2 設計思路
這個實例僅僅要求對輸入數(shù)據(jù)進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個默認的排序,而不需要自己再實現(xiàn)具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的默認排序規(guī)則。它是按照key值進行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序對字符串排序。
了解了這個細節(jié),我們就知道應該使用封裝int的IntWritable型數(shù)據(jù)結構了。也就是在map中將讀入的數(shù)據(jù)轉化成 IntWritable型,然后作為key值輸出(value任意)。reduce拿到《key,value-list》之后,將輸入的 key作為value輸出,并根據(jù)value-list中元素的個數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個全局變量,它統(tǒng)計當前key的位次。需要注意的是這個程序中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。
2.3 程序代碼
程序代碼如下所示:
package com.hebut.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
//map將輸入中的value化成IntWritable類型,作為輸出的key
public static class Map extends
Mapper《Object,Text,IntWritable,IntWritable》{
private static IntWritable data=new IntWritable();
//實現(xiàn)map函數(shù)
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
String line=value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
//reduce將輸入中的key復制到輸出數(shù)據(jù)的key上,
//然后根據(jù)輸入的value-list中元素的個數(shù)決定key的輸出次數(shù)
//用全局linenum來代表key的位次
public static class Reduce extends
Reducer《IntWritable,IntWritable,IntWritable,IntWritable》{
private static IntWritable linenum = new IntWritable(1);
//實現(xiàn)reduce函數(shù)
public void reduce(IntWritable key,Iterable《IntWritable》 values,Context context)
throws IOException,InterruptedException{
for(IntWritable val:values){
context.write(linenum, key);
linenum = new IntWritable(linenum.get()+1);
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//這句話很關鍵
conf.set(“mapred.job.tracker”, “192.168.1.2:9001”);
String[] ioArgs=new String[]{“sort_in”,“sort_out”};
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: Data Sort 《in》 《out》”);
System.exit(2);
}
Job job = new Job(conf, “Data Sort”);
job.setJarByClass(Sort.class);
//設置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設置輸出類型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.4 代碼結果
1)準備測試數(shù)據(jù)
通過Eclipse下面的“DFS Locations”在“/user/hadoop”目錄下創(chuàng)建輸入文件“sort_in”文件夾(備注:“sort_out”不需要創(chuàng)建。)如圖2.4-1所示,已經成功創(chuàng)建。
?
然后在本地建立三個txt文件,通過Eclipse上傳到“/user/hadoop/sort_in”文件夾中,三個txt文件的內容如“實例描述”那三個文件一樣。如圖2.4-2所示,成功上傳之后。
從SecureCRT遠處查看“Master.Hadoop”的也能證實我們上傳的三個文件。
?
查看兩個文件的內容如圖2.4-3所示:
?
2)查看運行結果
這時我們右擊Eclipse 的“DFS Locations”中“/user/hadoop”文件夾進行刷新,這時會發(fā)現(xiàn)多出一個“sort_out”文件夾,且里面有3個文件,然后打開雙 其“part-r-00000”文件,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。
?
評論