project1实验报告 - GongRuqin/bigdata GitHub Wiki
151220032 龚茹沁
1. 功能:主要包含一个java class:WordCount.java,实现中文分词和词频统计,并能根据用户输入的值K,倒序输出大于等于K词频的所有单词和其词频。
2. 用户输入格式:wordcount [-skip skipPatternFile] k
为输入路径,为输出路径,k为词频限制,skipPatternFile为自定义停词文件。
输出:词频 + 单词
3. 算法设计:利用之前写的wordcount2.0源码,在此基础上做一定改进。把skippatterns(自己写的stopwords.txt)用作停词,忽略掉一些符号、英文和无意义的副词和介词。
分词采用google的ik分词包实现中文分词。
排序是自己写了个类,先将一次mapreduce的结果(key,value对)存储在一个临时地址之中,然后将map结果的key和value值交换,按key值也就是词频进行倒序排序,然后进行一次reduce操作。
设置一个全局变量k,记下用户输入,在reducer部分进行词频统计和有条件的写入。
4. 代码说明:
主要分为三个类:
public static class ToMapper
extends Mapper<Object, Text, Text, IntWritable>实现map功能
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>实现reduce功能
public static class DescComparator extends WritableComparator实现排序功能
其中主要由上述第一个类中的public void map(Object key, Text value, Context context) throws IOException, InterruptedException实现数据读入,调用parseSkipFile实现停词,并之后利用IKAnalyzer_2012_u5实现分词。
Main类实现所有类的调用和整个mapreduce过程的控制。
5.具体代码:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Random;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.io.StringReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
public class WordCount {
public static int k = 0;
public static class ToMapper
extends Mapper<Object, Text, Text, IntWritable>{
`static enum CountersEnum { INPUT_WORDS }`
`private final static IntWritable one = new IntWritable(1);`
`private Text word = new Text();`
`private boolean caseSensitive;`
`private Set<String> patternsToSkip = new HashSet<String>();`
`private Configuration conf;`
`private BufferedReader fis;`
`@Override`
`public void setup(Context context) throws IOException,`
`InterruptedException {`
`conf = context.getConfiguration();`
`caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);`
`k=Integer.parseInt(conf.get("k"));`
`if (conf.getBoolean("wordcount.skip.patterns", true)) {`
`URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();`
`for (URI patternsURI : patternsURIs) {`
`Path patternsPath = new Path(patternsURI.getPath());`
`String patternsFileName = patternsPath.getName().toString();`
`parseSkipFile(patternsFileName);`
`}`
`}`
`}`
`private void parseSkipFile(String fileName) {`
`try {`
`fis = new BufferedReader(new FileReader(fileName));`
`String pattern = null;`
`while ((pattern = fis.readLine()) != null) {`
`patternsToSkip.add(pattern);`
`}`
`} catch (IOException ioe) {`
`System.err.println("Caught exception while parsing the cached file '"`
`+ StringUtils.stringifyException(ioe));`
`}`
`}`
`@Override`
`public void map(Object key, Text value, Context context`
`) throws IOException, InterruptedException {`
`String[] line = value.toString().split(" ");`
`System.out.println(line.length);`
`for (int j = 6; j<line.length-1; j++) {`
`for (String pattern : patternsToSkip) {`
`line[j] = line[j].replaceAll(pattern, "");`
`}`
`}`
`if (line.length >= 6) {`
`String id = line[line.length-1].trim();`
`for (int i = 6; i<line.length-1; i++) {`
`String content = line[i].trim();`
`StringReader sr = new StringReader(content);`
`IKSegmenter iks = new IKSegmenter(sr, true);`
`Lexeme lexeme = null;`
`while ((lexeme = iks.next()) != null) {`
`String word = lexeme.getLexemeText();`
`context.write(new Text(word), new IntWritable(1));`
`}`
`sr.close();`
`}`
`} else {`
`System.err.println("error:" + value.toString() + "------------------------");`
`}`
`}`
`}`
`public static class IntSumReducer`
`extends Reducer<Text,IntWritable,Text,IntWritable> {`
`private IntWritable result = new IntWritable();`
`public void setup(Context context) throws IOException {`
`Configuration conf = context.getConfiguration();`
`k = Integer.parseInt(conf.get("k"));`
`}`
`public void reduce(Text key, Iterable<IntWritable> values,`
`Context context`
`) throws IOException, InterruptedException {`
`int sum = 0;`
`for (IntWritable val : values) {`
`sum += val.get();`
`}`
`if(sum>=k){`
`result.set(sum);`
`context.write(key, result);`
`}`
`}`
`}`
`public static class DescComparator extends WritableComparator {`
`protected DescComparator() {`
`super(IntWritable.class,true);`
`}`
`@Override`
`public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,`
`int arg4, int arg5) {`
`return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);`
`}`
`@Override`
`public int compare(Object a,Object b){`
`return -super.compare(a, b);`
`}`
`}`
`public static void main(String[] args) throws Exception {`
`Configuration conf = new Configuration();`
`GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);`
`String[] remainingArgs = optionParser.getRemainingArgs();`
`if (!(remainingArgs.length != 3 || remainingArgs.length != 5)) {`
`System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile] k");`
`System.exit(2);`
`}`
`k= Integer.parseInt(remainingArgs[4]);`
`conf.setInt("k", k);`
`Path tempDir = new Path("wordcount-temp-" + Integer.toString(`
`new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录`
`Job job = Job.getInstance(conf, "word count");`
`/*job.setJarByClass(WordCount.class);`
`job.setMapperClass(ToMapper.class);`
`job.setCombinerClass(IntSumReducer.class);`
`job.setReducerClass(IntSumReducer.class);`
`job.setSortComparatorClass(DescComparator.class);`
`job.setOutputKeyClass(Text.class);`
`job.setOutputValueClass(IntWritable.class);*/`
`try{`
`job.setMapperClass(ToMapper.class);`
`job.setCombinerClass(IntSumReducer.class);`
`job.setReducerClass(IntSumReducer.class);`
`job.setOutputKeyClass(Text.class);`
`job.setOutputValueClass(IntWritable.class);`
`List<String> otherArgs = new ArrayList<String>();`
`for (int i=0; i < remainingArgs.length; ++i) {`
`if ("-skip".equals(remainingArgs[i])) {`
`job.addCacheFile(new Path(remainingArgs[++i]).toUri());`
`job.getConfiguration().setBoolean("wordcount.skip.patterns", true);`
`} else {`
`otherArgs.add(remainingArgs[i]);`
`}`
`}`
`FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));`
`FileOutputFormat.setOutputPath(job, tempDir);//先将词频统计任务的输出结果写到临时目`
`//录中, 下一个排序任务以临时目录为输入目录。`
`job.setOutputFormatClass(SequenceFileOutputFormat.class);`
`if(job.waitForCompletion(true))`
`{`
`Job sortJob = Job.getInstance(conf, "sort");`
`sortJob.setJarByClass(WordCount.class);`
`FileInputFormat.addInputPath(sortJob, tempDir);`
`sortJob.setInputFormatClass(SequenceFileInputFormat.class);`
`/*InverseMapper由hadoop库提供,作用是实现map()之后的数据对的key和value交换*/`
`sortJob.setMapperClass(InverseMapper.class);`
`/*将 Reducer 的个数限定为1, 最终输出的结果文件就是一个。*/`
`sortJob.setNumReduceTasks(1);`
`FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs.get(1)));`
`sortJob.setOutputKeyClass(IntWritable.class);`
`sortJob.setOutputValueClass(Text.class);`
`/*Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。`
`* 因此我们实现了一个 IntWritableDecreasingComparator 类, `
`* 并指定使用这个自定义的 Comparator 类对输出结果中的 key (词频)进行排序*/`
`sortJob.setSortComparatorClass(DescComparator.class);`
`System.exit(sortJob.waitForCompletion(true) ? 0 : 1);`
`}`
`}finally{`
`FileSystem.get(conf).deleteOnExit(tempDir);`
`}`
`/*FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));`
`FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));`
`System.exit(job.waitForCompletion(true) ? 0 : 1);*/`
`}`
}
6.执行文件
project1.1.jar:project1.1
1. 功能:主要包含InvertedIndex.java,实现中文分词,以url作为索引实现倒排索引:存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射
2. 用户输入格式:invertedindex [-skip skipPatternFile]
其中,为输入路径,为输出路径,skipPatternFile为自定义停词文件
输出:单词 url+在此新闻词条中出现的次数
3. 算法设计:
map部分与之前的wordcount实现基本保持一致。一开始map结束的key值为单词+url
reduce部分将相同的key的value值进行相加,算出每条新闻标题中的词频。将url和词频作为新的value
4. 代码说明:
主要分为三个类:
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>
实现map过程的中文停词分词
public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>
combine过程实现key值相同的value值累加
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>
生成新的value值,将与单词有关的所有url与词频作为输出
5.具体代码: InvertedIndex.java InvertedIndex.java
6.执行程序:
project1.2.jar
project1.2
使用给定的download_data文件夹。实际上能放倒hdfs上的文件夹数目有限,数据集没那么大。共115个txt文件。跑程序时间大约为20s
两个程序在做停词的时候都得利用patterns文件手动停词,用户友好性不强。因为停词是一个必要的板块,如果程序自动执行(单独写一个类)会好一些。
我是用数组进行数据读入的,不如token来的迅速,方便在取数很快,但程序适应性不强,是对特定的输入文件格式进行处理,万一爬下来的数据格式不一样还得进行修改。
第二个程序的输出结果不算很好看,考虑到一个url中出现的词频大多为1,感觉浪费了时间和空间。
另外,我的hdfs上跑的是给定的download_data文件夹中文件,好像只可以放上去几十个,数据量比较小,跑fulldata会用时很久很久,而且发现fulldata的文件格式和文件夹的有较大出入。