Run the Chain MapReduce Job
Sometimes we need to run the depended multiple Map Reduce job(Map---->Reduce---->Map).
Here's the one example of ChainMapReduce Job.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import data.mining.Dictionary;
import data.mining.SgmParser;
public class ChainWordCountDriver extends Configured implements Tool {
// TokenizerMapper - Parse the input file record for every token
public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
//here sgm parser is responsible for removing the stop words.
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
String line = value.toString();
line = SgmParser.parse(line);
line = line.replaceAll("\\s+", " ").trim();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
output.collect(new Text(tokenizer.nextToken()), one);
}
}
}
//LowerCaserMapper - It will lowercase the passed token from TokenizerMapper
public class LowerCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
public void map(Text key, IntWritable value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
String word = key.toString().toLowerCase();
System.out.println("Upper Case:"+word);
output.collect(new Text(word), value);
}
}
//WordCountReducer - is doing nothing special just writing the key in the context
public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get(); }
output.collect(key, new IntWritable(sum));
}
}
//LastMapper - will spilt the record sent from reducer and write into the final output file
public static class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, Text> {
// Now we have to to match filter words to the wordnet dictionary and find out the synsets.
public void map(Text key, IntWritable value,OutputCollector<Text, Text> output,Reporter reporter) throws IOException {
String word = key.toString();
StringBuffer sbr = new StringBuffer();
sbr.append(key.toString() + "\t" + value.toString());
//System.setProperty("wordnet.database.dir","/home/hadoop/WordnetDictionary/dict");
String matched = Dictionary.match(word);
output.collect(new Text(sbr.toString()), new Text(matched));
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
//conf.setJobName("wordcount");
//Setting the input and output path
FileInputFormat.setInputPaths(conf, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(conf, outputPath);
//Considering the input and output as text file set the input & output format to TextInputFormat
conf.setInputFormat(XmlInputFormat.class);
conf.set("xmlinput.start", "<TEXT>");
conf.set("xmlinput.end", "</TEXT>");
conf.setOutputFormat(TextOutputFormat.class);
conf.set("mapred.textoutputformat.separator", "\n");
JobConf mapAConf = new JobConf(false);
ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);
//addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call
JobConf mapBConf = new JobConf(false);
ChainMapper.addMapper(conf, LowerCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);
JobConf reduceConf = new JobConf(false);
reduceConf.setCombinerClass(WordCountReducer.class);
ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
JobConf mapCConf = new JobConf(false);
ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, Text.class, true, mapCConf);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
System.exit(res);
}}
--------------SGMParser.java-------------------------
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
public class SgmParser {
public static String parse(String line) throws IOException
{
final String LINE_SEPARATOR = System.getProperty("line.separator");
InputStream fstream =SgmParser.class.getResourceAsStream("stopwords.txt");
//InputStream fstream = new FileInputStream("stopwords.txt");
BufferedReader sfbr2 =null ;
String token =null;
// private static Pattern EXTRACTION_PATTERN = Pattern.compile("<TITLE>(.*?)</TITLE>|<DATE>(.*?)</DATE>|<BODY>(.*?)</BODY>");
Pattern EXTRACTION_PATTERN = Pattern.compile("<BODY>(.*?)</BODY>");
String[] META_CHARS = {"&", "<", ">", "\"", "'", "\""};
String[] META_CHARS_SERIALIZATIONS = {"&", "<", ">", """, "'"};
int index = -1;
// int docNumber = 0;
StringBuffer buffer = new StringBuffer();
StringBuffer buffer1 = new StringBuffer();
String parse = "";
String lt ="<";
String gt =">";
for (int id = line.indexOf(lt); index >= 0; id = line.indexOf(lt,id + 1)) {
int ct = line.indexOf(gt,index + 1);
int ot = line.indexOf(lt,index + 1);
if(ot!=-1)
buffer1.append(line.substring(ct+1,ot)).append(" ");
}
if(buffer1.length()==0) {
buffer1.append(line);
}
parse = buffer1.toString().toLowerCase();
parse=parse.replaceAll("[^a-zA-Z]", " ");
parse = parse.replaceAll("\\s+", " ").trim();
if ((index = parse.indexOf("</REUTERS")) == -1) {
//Replace the SGM escape sequences
buffer.append(parse).append(' ');//accumulate the strings for now, then apply regular expression to get the pieces,
} else {
//Extract the relevant pieces and write to a file in the output dir
Matcher matcher = EXTRACTION_PATTERN.matcher(parse);
while (matcher.find()) {
for (int i = 1; i <= matcher.groupCount(); i++) {
if (matcher.group(i) != null) {
buffer.append(matcher.group(i));
}
buffer.append(LINE_SEPARATOR).append(LINE_SEPARATOR);
}}}
String out = buffer.toString();
for (int i = 0; i < META_CHARS_SERIALIZATIONS.length; i++) {
out = out.replaceAll(META_CHARS_SERIALIZATIONS[i], META_CHARS[i]);
}
sfbr2 = new BufferedReader(new InputStreamReader(fstream, "UTF-8"));
while ((token = sfbr2.readLine()) != null) {
out=out.replaceAll("\\b"+token.trim()+"\\b", "");
}
return out;
}}
----------------------------Dictionary.java--------------------------------
import edu.smu.tspell.wordnet.Synset;
import edu.smu.tspell.wordnet.WordNetDatabase;
public class Dictionary {
public static String match(String searchword)
{
//String wordForm = buffer.toString();
// Get the synsets containing the wrod form
WordNetDatabase database = WordNetDatabase.getFileInstance();
Synset[] synsets = database.getSynsets(searchword);
StringBuffer sbfr = new StringBuffer();
//System.setProperty("wordnet.database.dir", "/home/hadoop/WordnetDictionary/dict");
// Display the word forms and definitions for synsets retrieved
if (synsets.length > 0)
{
for (int i = 0; i < synsets.length; i++)
{
String[] wordForms = synsets[i].getWordForms();
for (int j = 0; j < wordForms.length; j++)
{
sbfr.append((j > 0 ? ", " : "") +
wordForms[j]);
} }
sbfr.append(": " + synsets[i].getDefinition() + "\n");
}
}
else
{
sbfr.append("Not Found");
}
return sbfr.toString();
}
}