Wednesday, August 28, 2013

Chain MapReduce

Run the Chain MapReduce Job

Sometimes we need to run the depended multiple Map Reduce job(Map---->Reduce---->Map).
Here's the one example of ChainMapReduce Job.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;

import data.mining.Dictionary;
import data.mining.SgmParser;

public class ChainWordCountDriver extends Configured implements Tool { 

// TokenizerMapper  -  Parse the input file record for every token
public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();
//here sgm parser is responsible for removing the stop words.
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String line = value.toString();
 line = SgmParser.parse(line);
 line = line.replaceAll("\\s+", " ").trim();
 StringTokenizer tokenizer = new StringTokenizer(line);
 while (tokenizer.hasMoreTokens()) {
  output.collect(new Text(tokenizer.nextToken()), one);
        }
    }
}

//LowerCaserMapper - It will lowercase the passed token from TokenizerMapper
 public class LowerCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String word = key.toString().toLowerCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);
   }
}

//WordCountReducer - is doing nothing special just writing the key in the context
 public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
 sum += values.next().get(); }
 output.collect(key, new IntWritable(sum));
    }
}

//LastMapper - will spilt the record sent from reducer and write into the final output file
 public static class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, Text> {
// Now we have to to match filter words to the wordnet dictionary and find out the synsets.
    public void map(Text key, IntWritable value,OutputCollector<Text, Text> output,Reporter reporter) throws IOException {
     
 String word = key.toString();
StringBuffer sbr = new StringBuffer();
sbr.append(key.toString() + "\t" + value.toString());
//System.setProperty("wordnet.database.dir","/home/hadoop/WordnetDictionary/dict");
String matched = Dictionary.match(word);
output.collect(new Text(sbr.toString()), new Text(matched));
    }
}
 @Override
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
    //conf.setJobName("wordcount");

    //Setting the input and output path
    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    Path outputPath = new Path(args[1]);

    FileOutputFormat.setOutputPath(conf, outputPath);
    //Considering the input and output as text file set the input & output format to TextInputFormat
    conf.setInputFormat(XmlInputFormat.class);
    conf.set("xmlinput.start", "<TEXT>");
 conf.set("xmlinput.end", "</TEXT>");
    conf.setOutputFormat(TextOutputFormat.class);
    conf.set("mapred.textoutputformat.separator", "\n");
    JobConf mapAConf = new JobConf(false);

    ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);      

        //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call

    JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, LowerCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

    JobConf reduceConf = new JobConf(false);
    reduceConf.setCombinerClass(WordCountReducer.class);
    ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

   JobConf mapCConf = new JobConf(false);
   ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, Text.class, true, mapCConf);
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
    System.exit(res);
}}
--------------SGMParser.java-------------------------

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
public class SgmParser {
 public static String parse(String line) throws IOException
 {
   final String LINE_SEPARATOR = System.getProperty("line.separator");
   
   InputStream fstream =SgmParser.class.getResourceAsStream("stopwords.txt");
  
  //InputStream fstream = new FileInputStream("stopwords.txt");
   
   BufferedReader sfbr2 =null ;
   
   String token =null;
      // private static Pattern EXTRACTION_PATTERN = Pattern.compile("<TITLE>(.*?)</TITLE>|<DATE>(.*?)</DATE>|<BODY>(.*?)</BODY>");
       Pattern EXTRACTION_PATTERN = Pattern.compile("<BODY>(.*?)</BODY>");
   
        String[] META_CHARS = {"&", "<", ">", "\"", "'", "\""};
   
        String[] META_CHARS_SERIALIZATIONS  = {"&amp;", "&lt;", "&gt;", "&quot;", "&apos;"};
        
        
           int index = -1;
      //     int docNumber = 0;
           StringBuffer buffer = new StringBuffer();
           StringBuffer buffer1 = new StringBuffer();
           String parse = "";
        String lt ="<";
        String gt =">";
           
           for (int id = line.indexOf(lt); index >= 0; id = line.indexOf(lt,id + 1)) {
     int ct = line.indexOf(gt,index + 1);
     int ot = line.indexOf(lt,index + 1);
     if(ot!=-1)
      buffer1.append(line.substring(ct+1,ot)).append(" ");
    }
       if(buffer1.length()==0) {
        buffer1.append(line);
       }
      parse = buffer1.toString().toLowerCase();
      parse=parse.replaceAll("[^a-zA-Z]", " ");
      parse = parse.replaceAll("\\s+", " ").trim();
           
           
           
           if ((index = parse.indexOf("</REUTERS")) == -1) {
               //Replace the SGM escape sequences
            
               buffer.append(parse).append(' ');//accumulate the strings for now, then apply regular expression to get the pieces,
           } else {
               //Extract the relevant pieces and write to a file in the output dir
            
         
      
               Matcher matcher = EXTRACTION_PATTERN.matcher(parse);
               while (matcher.find()) {
                   for (int i = 1; i <= matcher.groupCount(); i++) {
                       if (matcher.group(i) != null) {
                        buffer.append(matcher.group(i));
                       }
                       buffer.append(LINE_SEPARATOR).append(LINE_SEPARATOR);
 }}}
                   String out = buffer.toString();
                   for (int i = 0; i < META_CHARS_SERIALIZATIONS.length; i++) {
                       out = out.replaceAll(META_CHARS_SERIALIZATIONS[i], META_CHARS[i]);
}
                   
                   sfbr2 = new BufferedReader(new InputStreamReader(fstream, "UTF-8"));
          while ((token = sfbr2.readLine()) != null) {
           out=out.replaceAll("\\b"+token.trim()+"\\b", "");
          }
                   return out;
               }}
----------------------------Dictionary.java--------------------------------

import edu.smu.tspell.wordnet.Synset;
import edu.smu.tspell.wordnet.WordNetDatabase;

public class Dictionary {

 public static String match(String searchword)
 {
  //String wordForm = buffer.toString();
  //  Get the synsets containing the wrod form
  WordNetDatabase database = WordNetDatabase.getFileInstance();
  Synset[] synsets = database.getSynsets(searchword);
  
  StringBuffer sbfr = new StringBuffer();
//System.setProperty("wordnet.database.dir", "/home/hadoop/WordnetDictionary/dict");
  
  //  Display the word forms and definitions for synsets retrieved
  if (synsets.length > 0)
  {
   for (int i = 0; i < synsets.length; i++)
   {
    
    String[] wordForms = synsets[i].getWordForms();
    for (int j = 0; j < wordForms.length; j++)
    {
     sbfr.append((j > 0 ? ", " : "") +
     wordForms[j]);
    }    }
   
    sbfr.append(": " + synsets[i].getDefinition() + "\n");
   }
  }
  else
  {
   sbfr.append("Not Found");
  }
  return sbfr.toString();
 }
}

No comments:

Post a Comment