Friday, August 30, 2013

Let's Discuss Hbase - Row key Design

HBase: Row-Key Design ---- --- Demonstrate: Design Solutions and Pros/Cons ---


Row-Key Design

Try to keep row keys short because they are stored with each cell in an HBase table, thus noticeably reducing row-key size results of data needed for storing HBase data. This advice also applies to column family names.

Common problems of choosing between sequential row keys and randomly distributed row keys:

Some mixed-design approaches allow fast range scans while distributing data among all clusters when writing sequential (by nature) data.

Design Solution: Using sequential row keys (e.g. time-series data with row key built based on timestamp)
Pros: Makes it possible to perform fast range scans with help of setting start/stop keys on Scanner
Cons: Creates single regionserver, hotspotting problems upon writing data (as row keys go in sequence, all records end up written into a single region at a time)


Design Solution: Using randomly distributed row keys(e.g. UUIDs)
Pros: Aims for fastest writing performance by distributing new records over random regions
Cons: Does not conduct fast range scans against written data


{{{Column Families}}}
Currently, HBase does not do well with anything above two or three column families per table. With that said, keep the number of column families in your schema low. Try to make do with one column family in your schemata if you can. Only introduce a second and third column family in the case where data access is usually column-scoped; i.e. you usually query no more than a single column family at one time.

You can also set TTL (in seconds) for a column family. HBase will automatically delete rows once reaching the expiration time.

{{{Versions}}}
The maximum number of row versions that can be stored is configured per column family (the default is 3). This is an important parameter because HBase does not overwrite row values, but rather stores different values per row by time (and qualifier). Setting the number of maximum versions to an exceedingly high level (e.g., hundreds or more) is not a good idea because that will greatly increase StoreFile size.

The minimum number of row versions to keep can also be configured per column family (the default is 0, meaning the feature is disabled). This parameter is used together with TTL and maximum row versions parameters to allow configurations such as “keep the last T minutes worth of data of at least M versions, and at most N versions.” This parameter should only be set when TTL is enabled for a column family and must be less than the number of row versions.

{{{Data Types}}}
HBase supports a “bytes-in/bytes-out” interface via Put and Result, so anything that can be converted to an array of bytes can be stored as a
value. Input can be strings, numbers, complex objects, or even images, as long as they can be rendered as bytes.

One supported data type that deserves special mention is the “counters” type. This type enables atomic increments of numbers.......

Some Case study for designing the row key-

Wednesday, August 28, 2013

Impala JDBC Connection

Cloudera Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop. With Impala, analysts and data scientists now have the ability to perform real-time, “speed of thought” analytics on data stored in Hadoop via SQL or through Business Intelligence (BI) tools. The result is that large-scale data processing (via MapReduce) and interactive queries can be done on the same system using the same data and metadata – removing the need to migrate data sets into specialized systems and/or proprietary formats simply to perform analysis.

This is the sample program to connect your impalad by your client machine through JDBC.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class DriverTest {

private static String SQL_STATEMENT = "select sid from student limit 5";

// set the impalad host
private static String IMPALAD_HOST = "rsharma@ec2-xx-xxx-xxx-xx.compute-1.amazonaws.com";

// port 21050 is the default impalad JDBC port
private static final String IMPALAD_JDBC_PORT = "21050";

private static final String CONNECTION_URL = "jdbc:hive2://" + IMPALAD_HOST + ':' + IMPALAD_JDBC_PORT + "/;auth=noSasl";

private static final String JDBC_DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";

/**
* @param args
* @throws ClassNotFoundException
* @throws SQLException
*/
public static void main(String[] args) throws ClassNotFoundException, SQLException {

System.out.println("\n=============================================");
System.out.println("Cloudera Impala JDBC Example");
System.out.println("Using Connection URL: " + CONNECTION_URL);
System.out.println("Running Query: " + SQL_STATEMENT);

Connection con = null;

try {

Class.forName(JDBC_DRIVER_NAME);

con = DriverManager.getConnection(CONNECTION_URL);

Statement stmt = con.createStatement();

ResultSet rs = stmt.executeQuery(SQL_STATEMENT);

System.out.println("\n== Begin Query Results ======================");

// print the results to the console
while (rs.next()) {
// the example query returns one String column
System.out.println(rs.getString(1));
}

System.out.println("== End Query Results =======================\n\n");

} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
con.close();
} catch (Exception e) {
}
}
}
}

Chain MapReduce

Run the Chain MapReduce Job

Sometimes we need to run the depended multiple Map Reduce job(Map---->Reduce---->Map).
Here's the one example of ChainMapReduce Job.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;

import data.mining.Dictionary;
import data.mining.SgmParser;

public class ChainWordCountDriver extends Configured implements Tool { 

// TokenizerMapper  -  Parse the input file record for every token
public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();
//here sgm parser is responsible for removing the stop words.
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String line = value.toString();
 line = SgmParser.parse(line);
 line = line.replaceAll("\\s+", " ").trim();
 StringTokenizer tokenizer = new StringTokenizer(line);
 while (tokenizer.hasMoreTokens()) {
  output.collect(new Text(tokenizer.nextToken()), one);
        }
    }
}

//LowerCaserMapper - It will lowercase the passed token from TokenizerMapper
 public class LowerCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String word = key.toString().toLowerCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);
   }
}

//WordCountReducer - is doing nothing special just writing the key in the context
 public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
 sum += values.next().get(); }
 output.collect(key, new IntWritable(sum));
    }
}

//LastMapper - will spilt the record sent from reducer and write into the final output file
 public static class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, Text> {
// Now we have to to match filter words to the wordnet dictionary and find out the synsets.
    public void map(Text key, IntWritable value,OutputCollector<Text, Text> output,Reporter reporter) throws IOException {
     
 String word = key.toString();
StringBuffer sbr = new StringBuffer();
sbr.append(key.toString() + "\t" + value.toString());
//System.setProperty("wordnet.database.dir","/home/hadoop/WordnetDictionary/dict");
String matched = Dictionary.match(word);
output.collect(new Text(sbr.toString()), new Text(matched));
    }
}
 @Override
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
    //conf.setJobName("wordcount");

    //Setting the input and output path
    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    Path outputPath = new Path(args[1]);

    FileOutputFormat.setOutputPath(conf, outputPath);
    //Considering the input and output as text file set the input & output format to TextInputFormat
    conf.setInputFormat(XmlInputFormat.class);
    conf.set("xmlinput.start", "<TEXT>");
 conf.set("xmlinput.end", "</TEXT>");
    conf.setOutputFormat(TextOutputFormat.class);
    conf.set("mapred.textoutputformat.separator", "\n");
    JobConf mapAConf = new JobConf(false);

    ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);      

        //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call

    JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, LowerCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

    JobConf reduceConf = new JobConf(false);
    reduceConf.setCombinerClass(WordCountReducer.class);
    ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

   JobConf mapCConf = new JobConf(false);
   ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, Text.class, true, mapCConf);
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
    System.exit(res);
}}
--------------SGMParser.java-------------------------

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
public class SgmParser {
 public static String parse(String line) throws IOException
 {
   final String LINE_SEPARATOR = System.getProperty("line.separator");
   
   InputStream fstream =SgmParser.class.getResourceAsStream("stopwords.txt");
  
  //InputStream fstream = new FileInputStream("stopwords.txt");
   
   BufferedReader sfbr2 =null ;
   
   String token =null;
      // private static Pattern EXTRACTION_PATTERN = Pattern.compile("<TITLE>(.*?)</TITLE>|<DATE>(.*?)</DATE>|<BODY>(.*?)</BODY>");
       Pattern EXTRACTION_PATTERN = Pattern.compile("<BODY>(.*?)</BODY>");
   
        String[] META_CHARS = {"&", "<", ">", "\"", "'", "\""};
   
        String[] META_CHARS_SERIALIZATIONS  = {"&amp;", "&lt;", "&gt;", "&quot;", "&apos;"};
        
        
           int index = -1;
      //     int docNumber = 0;
           StringBuffer buffer = new StringBuffer();
           StringBuffer buffer1 = new StringBuffer();
           String parse = "";
        String lt ="<";
        String gt =">";
           
           for (int id = line.indexOf(lt); index >= 0; id = line.indexOf(lt,id + 1)) {
     int ct = line.indexOf(gt,index + 1);
     int ot = line.indexOf(lt,index + 1);
     if(ot!=-1)
      buffer1.append(line.substring(ct+1,ot)).append(" ");
    }
       if(buffer1.length()==0) {
        buffer1.append(line);
       }
      parse = buffer1.toString().toLowerCase();
      parse=parse.replaceAll("[^a-zA-Z]", " ");
      parse = parse.replaceAll("\\s+", " ").trim();
           
           
           
           if ((index = parse.indexOf("</REUTERS")) == -1) {
               //Replace the SGM escape sequences
            
               buffer.append(parse).append(' ');//accumulate the strings for now, then apply regular expression to get the pieces,
           } else {
               //Extract the relevant pieces and write to a file in the output dir
            
         
      
               Matcher matcher = EXTRACTION_PATTERN.matcher(parse);
               while (matcher.find()) {
                   for (int i = 1; i <= matcher.groupCount(); i++) {
                       if (matcher.group(i) != null) {
                        buffer.append(matcher.group(i));
                       }
                       buffer.append(LINE_SEPARATOR).append(LINE_SEPARATOR);
 }}}
                   String out = buffer.toString();
                   for (int i = 0; i < META_CHARS_SERIALIZATIONS.length; i++) {
                       out = out.replaceAll(META_CHARS_SERIALIZATIONS[i], META_CHARS[i]);
}
                   
                   sfbr2 = new BufferedReader(new InputStreamReader(fstream, "UTF-8"));
          while ((token = sfbr2.readLine()) != null) {
           out=out.replaceAll("\\b"+token.trim()+"\\b", "");
          }
                   return out;
               }}
----------------------------Dictionary.java--------------------------------

import edu.smu.tspell.wordnet.Synset;
import edu.smu.tspell.wordnet.WordNetDatabase;

public class Dictionary {

 public static String match(String searchword)
 {
  //String wordForm = buffer.toString();
  //  Get the synsets containing the wrod form
  WordNetDatabase database = WordNetDatabase.getFileInstance();
  Synset[] synsets = database.getSynsets(searchword);
  
  StringBuffer sbfr = new StringBuffer();
//System.setProperty("wordnet.database.dir", "/home/hadoop/WordnetDictionary/dict");
  
  //  Display the word forms and definitions for synsets retrieved
  if (synsets.length > 0)
  {
   for (int i = 0; i < synsets.length; i++)
   {
    
    String[] wordForms = synsets[i].getWordForms();
    for (int j = 0; j < wordForms.length; j++)
    {
     sbfr.append((j > 0 ? ", " : "") +
     wordForms[j]);
    }    }
   
    sbfr.append(": " + synsets[i].getDefinition() + "\n");
   }
  }
  else
  {
   sbfr.append("Not Found");
  }
  return sbfr.toString();
 }
}

Sentimental Analysis

Sentimental Analysis using WordNet
Dictionary

 This is the sample program to calculate the each and every word of semantic orientation by using sentiwordnet dictionary.

package data.mining;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;

    public class SentimentalWordNet {
        private String pathToSWN = "C:\\Users\\rsharma\\Downloads\\SentiWordNet_3.0.0.txt";
        private HashMap<String, Double> _dict;

        public SentimentalWordNet(){

            _dict = new HashMap<String, Double>();
            HashMap<String, Vector<Double>> _temp = new HashMap<String, Vector<Double>>();
            try{
                BufferedReader csv =  new BufferedReader(new FileReader(pathToSWN));
                String line = "";
                while((line = csv.readLine()) != null)
                {
                    String[] data = line.split("\t");
                    Double score = Double.parseDouble(data[2])-Double.parseDouble(data[3]);
                    String[] words = data[4].split(" ");
                    for(String w:words)
                    {
                        String[] w_n = w.split("#");
                        w_n[0] += "#"+data[0];
                        int index = Integer.parseInt(w_n[1])-1;
                        if(_temp.containsKey(w_n[0]))
                        {
                            Vector<Double> v = _temp.get(w_n[0]);
                            if(index>v.size())
                                for(int i = v.size();i<index; i++)
                                    v.add(0.0);
                            v.add(index, score);
                            _temp.put(w_n[0], v);
                        }
                        else
                        {
                            Vector<Double> v = new Vector<Double>();
                            for(int i = 0;i<index; i++)
                                v.add(0.0);
                            v.add(index, score);
                            _temp.put(w_n[0], v);
                        }
                    }
                }
                Set<String> temp = _temp.keySet();
                for (Iterator<String> iterator = temp.iterator(); iterator.hasNext();) {
                    String word = (String) iterator.next();
                    Vector<Double> v = _temp.get(word);
                    double score = 0.0;
                    double sum = 0.0;
                    for(int i = 0; i < v.size(); i++)
                        score += ((double)1/(double)(i+1))*v.get(i);
                    for(int i = 1; i<=v.size(); i++)
                        sum += (double)1/(double)i;
                    score /= sum;
                    String sent = "";
                    if(score>=0.75)
                        sent = "strong_positive";
                    else
                    if(score > 0.25 && score<=0.5)
                        sent = "positive";
                    else
                    if(score > 0 && score>=0.25)
                        sent = "weak_positive";
                    else
                    if(score < 0 && score>=-0.25)
                        sent = "weak_negative";
                    else
                    if(score < -0.25 && score>=-0.5)
                        sent = "negative";
                    else
                    if(score<=-0.75)
                        sent = "strong_negative";
                    _dict.put(word, score);
                }
            }
            catch(Exception e){
             //e.printStackTrace();
             }        

        }

public Double extract(String word)
{
    Double total = new Double(0);
    if(_dict.get(word+"#n") != null)
         total = _dict.get(word+"#n") + total;
    if(_dict.get(word+"#a") != null)
        total = _dict.get(word+"#a") + total;
    if(_dict.get(word+"#r") != null)
        total = _dict.get(word+"#r") + total;
    if(_dict.get(word+"#v") != null)
        total = _dict.get(word+"#v") + total;
    return total;
}

public static void main(String[] args) {
 SentimentalWordNet test = new SentimentalWordNet();
    String sentence="hey i had a wonderful an experience in barista";
    String[] words = sentence.split("\\s+");
    double totalScore = 0;
    for(String word : words) {
        word = word.replaceAll("([^a-zA-Z\\s])", "");
        if (test.extract(word) == null)
            continue;
        totalScore += test.extract(word);
    }
    if(totalScore == 0)
    {
    System.out.println("Neutral Statement :" + totalScore);
    } else if(totalScore > 0) {
     System.out.println("Postive Statement :" + totalScore);
    } else {
     System.out.println("Negative Statement :" + totalScore);
    }
}

}
 
OUTPUT:-
Postive Statement :0.34798245187641463

Note: Find the attached link for sentimentalwordnetdictionary. Remove the all comments and hash tag from the beginning and end of the file.
http://sentiwordnet.isti.cnr.it/download.php
 

XMIInputFormat

XMLInputFormat for old mapred API

Parse the XML files by using XMLInputFormat.
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

/**
 * Reads records that are delimited by a specifc begin/end tag.
 */
public class XmlInputFormat extends TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  @Override
  public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
                                                         JobConf jobConf,
                                                         Reporter reporter) throws IOException {
    return new XmlRecordReader((FileSplit) inputSplit, jobConf);
  }

  /**
   * XMLRecordReader class to read through a given xml document to output xml
   * blocks as records as specified by the start tag and end tag
   *
   */
  public static class XmlRecordReader implements
      RecordReader<LongWritable,Text> {
    private final byte[] startTag;
    private final byte[] endTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();

    public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");

      // open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(jobConf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }

    @Override
    public boolean next(LongWritable key, Text value) throws IOException {
      if (fsin.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {
              key.set(fsin.getPos());
              value.set(buffer.getData(), 0, buffer.getLength());
              return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
    }

    @Override
    public LongWritable createKey() {
      return new LongWritable();
    }

    @Override
    public Text createValue() {
      return new Text();
    }

    @Override
    public long getPos() throws IOException {
      return fsin.getPos();
    }

    @Override
    public void close() throws IOException {
      fsin.close();
    }

    @Override
    public float getProgress() throws IOException {
      return (fsin.getPos() - start) / (float) (end - start);
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
      int i = 0;
      while (true) {
        int b = fsin.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
      }
    }
  }
}

PseudoMode Hadoop Installation on Ubuntu

------------------Java Installation --------------

sudo apt-get purge openjdk*
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java6-installer
sudo apt-get install oracle-java6-set-default

------------ JAVA_HOME -----------------
/usr/lib/jvm/java-6-oracle

------------- Disable ipv6 --------------------
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

http://www.ubuntugeek.com/how-to-install-oracle-java-7-in-ubuntu-12-04.html
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

-----------create user for hadoop ---------------
sudo addgroup hadoop
$ sudo adduser --ingroup hadoop hduser
su - hduser
ssh-keygen -t rsa -P ""
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

------------Update the xml -----------------------
*******core-site.xml

<property>
<name>hadoop.tmp.dir</name>
<value>/app/hadoop/tmp</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:54310</value>
</property>

************mapred-site.xml

<property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
</property>

******hdfs-site.xml

<property>
  <name>dfs.replication</name>
  <value>1</value>
 </property>
----------- Format namenode ----------------

bin/hadoop namenode -format

SSH Connection

SSH connection from your client 

machine to cloud server

Connect your Cloud server file system to your local client. This is the sample program of file uploading from your local machine to cloud server.
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;

public class ConnectSSH {

 public void connect(String dnsName, String privKey) throws IOException, SftpException {
     JSch jSch = new JSch();
     Session session  = null;
     ChannelSftp channel = null;
     try {
         //Authenticate through Private Key File
         jSch.addIdentity(privKey);
         //Give the user and dnsName
         session = jSch.getSession("huser", dnsName, 22);
         //Required if not a trusted host
         java.util.Properties config = new java.util.Properties();
         config.put("StrictHostKeyChecking", "no");
         session.setConfig(config);
         System.out.println("Connecting SSH to " + dnsName + " - Please wait for few minutes... ");
         session.connect();
         System.out.println(session.getHost());

         channel = (ChannelSftp)session.openChannel("sftp");
         channel.connect();
         channel.cd("/home/huser");
         File localFile = new File("C:/Users/rsharma/Desktop/hivedata.txt");
         channel.put(new FileInputStream(localFile), localFile.getName());
         System.out.println("--------File has been Copied----------");
         channel.disconnect();
         session.disconnect();
     } catch (JSchException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     } finally {
      channel = null;
      session = null;
     }
 }

 public static void main(String[] args) throws SftpException {
     ConnectSSH ssh = new ConnectSSH();
     String privKey = "D:/Amazon Key/huser-key.pem";
     try {
         ssh.connect("xxx-xx-xx-xxx-xx.compute-1.amazonaws.com", privKey);
     } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     }

 }

}

Hive JDBC Connection

Hive JDBC Connection with your localhost or cloud hosted cluster

import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;

public class HiveJdbcClient {
  private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";

  /**
 * @param args
 * @throws SQLException
   */
  public static void main(String[] args) throws SQLException {
      try {
      Class.forName(driverName);
    } catch (ClassNotFoundException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      System.exit(1);
    }
    Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default", "", "");
    Statement stmt = con.createStatement();
    String tableName = "testHiveDriverTable";
    stmt.executeQuery("drop table " + tableName);
    ResultSet res = stmt.executeQuery("create table " + tableName + " (key int, value string)" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
    // show tables
    String sql = "show tables '" + tableName + "'";
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    if (res.next()) {
      System.out.println(res.getString(1));
    }
    // describe table
    sql = "describe " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(res.getString(1) + "\t" + res.getString(2));
    }

    // load data into table
    // NOTE: filepath has to be local to the hive server
    String filepath = "/home/huser/hivedata.txt";
    sql = "load data local inpath '" + filepath + "' into table " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);

    // select * query
    sql = "select * from " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));
    }

    // regular hive query
    sql = "select count(1) from " + tableName;
    System.out.println("Running: " + sql);
    res = stmt.executeQuery(sql);
    while (res.next()) {
      System.out.println(res.getString(1));
    }
  }
}
Note: To make your hive jdbc connection with amazon cloud server, Just add the localhost:10000 to your putty

MapReduce Code Testing

MR code testing using MRunit

MR framework provides the MRunit testing for test your MR code. You can test your code in
local environment and then can run it on cluster.


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.mrunit.*;
import org.apache.hadoop.mrunit.types.Pair;
import com.google.common.collect.ImmutableList;

public class MRJobTest {

  private MapDriver<LongWritable, Text, Text, Text> mapDriver;
  private ReduceDriver<Text, Text, Text, Text> reduceDriver;
  private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;

  public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
  public static final int RETAIlER_INDEX = 0;

  @Override
  public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
   final String[] record = StringUtils.split(text.toString(), ",");
   final String retailer = record[RETAIlER_INDEX];
   for (int i = 1; i < record.length; i++) {
    final String keyword = record[i];
    outputCollector.collect(new Text(keyword), new Text(retailer));
    }
   }
  }
 public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

 @Override
 public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter)
   throws IOException {
  // TODO Auto-generated method stub
  final String retailers = StringUtils.join(textIterator, ',');
    outputCollector.collect(text, new Text(retailers));
 }
  }

  @Before
  public void setUp() throws Exception {

  final InvertedIndexMapper mapper = new InvertedIndexMapper();
  final InvertedIndexReducer reducer = new InvertedIndexReducer();

  mapDriver = MapDriver.newMapDriver(mapper);
  reduceDriver = ReduceDriver.newReduceDriver(reducer);
  mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
  }

  @Test
  public void testMapperWithSingleKeyAndValue() throws Exception {
  final LongWritable inputKey = new LongWritable(0);
  final Text inputValue = new Text("www.kroger.com,groceries,clothes");

  final Text outputKey = new Text("groceries");
  final Text outputValue = new Text("www.kroger.com");

  mapDriver.withInput(inputKey, inputValue);
  mapDriver.withOutput(outputKey, outputValue);
  mapDriver.runTest();

  }
  @Test
  public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
  final LongWritable key = new LongWritable(0);
 mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
  final List<Pair<Text, Text>> result = mapDriver.run();

  final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
  final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));

 assertThat(result)
  .isNotNull()
  .hasSize(6)
  .contains(books, toys);
 }
  @Test
  public void testReducer() throws Exception {
 final Text inputKey = new Text("books");
 final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));

 reduceDriver.withInput(inputKey,inputValue);
 final List<Pair<Text, Text>> result = reduceDriver.run();
 //final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));
  reduceDriver.withOutput(inputKey, new Text("www.amazon.com,www.ebay.com"));
 /* assertThat(result)
  .isNotNull()
  .hasSize(1)
  .containsExactly(pair2); */
  reduceDriver.runTest();
  }

 }
Note: Add the mrunit jar and dependent jar.

XML parsing using PIG

This are the steps for parsing your XML files by PIG.

Step 1: Set the classpath for pig bin
export PATH=/home/hadoop/Documents/pig-0.11.1/bin:$PATH

Step 2: Register the jar file

REGISTER '/home/hadoop/Documents/pig-0.11.1/contrib/piggybank/java/piggybank.jar'

Step 3: Load the data

xml = load '/user/hadoop/input/xml.txt' USING 
org.apache.pig.piggybank.storage.XMLLoader('name') as(doc:chararray);
@ data looks like
<Property>
<name>Ryan</name>
</Property>

Step 4: Parse the file and retrieve the value

value = foreach xml GENERATE FLATTEN(REGEX_EXTRACT_ALL(doc,'<name>(.*)</name>'))  AS name:chararray;

Step 5: show the value

dump value;

*Parse the multi attribute file
@ data looks like
<Property>
 <fname>joseph</fname>
 <lname>christino</lname>
 <landmark>peter tower</landmark>
 <city>panji</city>
 <state>Goa</state>
 <contact>89456123</contact>
 <email>joseph@gmail.com</email>
 <PAN_Card>0011542</PAN_Card>
 <URL>blog.joseph.com</URL>
</Property>

Load the data:
pigdata = load '/input/file.txt' USING 
org.apache.pig.piggybank.storage.XMLLoader('Property') as (doc:chararray);

Parse the values:
values = foreach pigdata GENERATE FLATTEN(REGEX_EXTRACT_ALL(doc,'<Property>\\s*<fname>(.*)</fname>\\s*<lname>(.*)</lname>\\s*<landmark>(.*)</landmark>\\s*<city>(.*)</city>\\s*<state>(.*)</state>\\s*<contact>(.*)</contact>\\s*<email>(.*)</email>\\s*<PAN_Card>(.*)</PAN_Card>\\s*<URL>(.*)</URL>\\s*</Property>')) AS (fname:chararray, lname:chararray, landmark:chararray, city:chararray, state:chararray, contact:int, email:chararray, PAN_Card:long, URL:chararray);

Output:

dump values;

(joseph,christino,peter tower,panji,Goa,89456123,joseph@gmail.com,0011542,blog.joseph.com)

Apache Web Log Analysis using PIG

Enter into the Pig shell.

Load the log file into Pig using the LOAD command.

grunt>raw_logs = LOAD '/user/input/apacheLog.log' USING TextLoader AS (line:chararray);

Parse the log file and assign different field to different varriable.

logs_base = FOREACH raw_logs GENERATE FLATTEN (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"') )
AS (remoteAddr: chararray, remoteLogname: chararray, user: chararray,  time: chararray, request: chararray, status: int, bytes_string: chararray, eferrer: chararray, browser: chararray);

We need only time (time), IP Address (remoteAddr), and user (remoteLogname). So we extract these three variables for each record and assign them to a placeholder.

logs =  FOREACH logs_base GENERATE remoteAddr,remoteLogname, time;

Now we need to find out the number of hits and number of unique users based on time.
We can achieve this in Pig by grouping all the records based on some variable or combination of variables. In our case, it would be datetime.

group_time = GROUP logs BY (time);
In this grouping, we need to find out the count of number of hits and number of unique users.
In order to find out the number of hits, we simply take count of the number of IP addresses in a given year using the COUNT.

Putting it all together, we can find out the number of hits and number of unique users(but in our case it will come 1 because name of user is '-') for each time using this statement.

X = FOREACH group_time { unique_users = DISTINCT logs.remoteLogname;

GENERATE FLATTEN(group), COUNT(unique_users) AS UniqueUsers,COUNT(logs) as counts;}
(Results are in the form of Time, Unique Users, No. of Hits)

JSON File Processing through HIVE

Step 1: Add the resources

ADD JAR /usr/lib/hive-0.10.0/lib/hive-contrib-0.10.0.jar;
ADD JAR /usr/lib/hive-0.10.0/lib/hive-metastore-0.10.0.jar;
ADD JAR /home/hadoop/Documents/JSON-Serde.jar;

Step 2: Create the table

CREATE TABLE record (
   id INT,
   city_code ARRAY<INT>,
   email STRING,
   contact STRUCT<Mobile_no:STRING, Telephone_no:STRING>
) ROW FORMAT SERDE 'org.JSONSerDe';

Step 3: Load the data

@Data looks like
{ "id": 1, "city_code": [ 1, 2, 3 ], "email": "joseph@gmail.com", "contact": { "Mobile_no": "val1", "Telephone_no": "val2" } }
{ "id": 2, "city_code": [ 4, 5, 6 ], "email": "james@gmail.com", "contact": { "Mobile_no": "val3", "Telephone_no": "val4" } }
{ "id": 3, "city_code": [ 7, 8, 9 ], "email": "rony@gmail.com", "contact": { "Mobile_no": "val5", "Telephone_no": "val6" } }

 Load data local inpath '/home/hadoop/Documents/record_data.txt' OVERWRITE INTO TABLE record;

 Step 4 Retrieve the data

 select * from record;
 select contact.Mobile_no from record where id = 1;
 
Note: You can find the JsonSeDe jar from here https://github.com/cloudera/cdh-twitter-example/blob/master/hive-serdes/src/main/java/com/cloudera/hive/serde/JSONSerDe.java

Hive Connection with MongoDB

Do the processing by hive and store the result into MongoDB.

ADD JAR /home/hadoop/Documents/Hive-mongo-master/release/hive-mongo-0.0.2.jar;
ADD JAR /home/hadoop/Documents/Hive-mongo-master/release/hive-mongo-0.0.2-jar-with-dependencies.jar;
ADD JAR /usr/lib/hive-0.10.0/lib/hive-metastore-0.10.0.jar;
ADD JAR /home/hadoop/Documents/guava-r06.jar;
ADD JAR /home/hadoop/Documents/mongo-java-driver-2.6.3.jar;
create table if not exists mongo_test(id int, name String, age int) ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',';

 load data local inpath '/home/hadoop/Documents/mongodata.txt' into table mongo_test;

 create external table mongotable(id int, name string, age int)
     stored by "org.yong3.hive.mongo.MongoStorageHandler"
     with serdeproperties ( "mongo.column.mapping" = "_id,name,age" )
     tblproperties ( "mongo.host" = "localhost", "mongo.port" = "27017",
    "mongo.db" = "db", "mongo.collection" = "ravi" );

 insert overwrite table mongotable select id, name, age from mongo_test;