Learnerslesson
   JAVA   
  SPRING  
  SPRINGBOOT  
 HIBERNATE 
  HADOOP  
   HIVE   
   ALGORITHMS   
   PYTHON   
   GO   
   KOTLIN   
   C#   
   RUBY   
   C++   




MAP REDUCE JAVA EXAMPLE


Let us see the word count example in java. There is many java classes involved but we will be mainly focussing on 3 classes.

i.e. Mapper class, Reducer class and the class where Mapper and Reducer class would be executed.


The Mapper class

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
class WordCountMapper extends MapReduceBase implements Mapper <LongWritable,Text,Text,IntWritable> {
  public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
   Reporter rep) throws IOException {
   String row = value.toString();
   String[] words = row.split(" ");
   for (String word : words) {
    if(word.length > 0){
     output.collect(new Text(word), new IntWritable(1));
    }
   }
   }
}


In the above example we have named the mapper class as 'WordCountMapper' which extends MapReduceBase and implements the Mapper<LongWritable,Text,Text,IntWritable> class.Where the generic type parameters of the Mapper represents :


LongWritable : Input Key Type of the Map.
Text : Input Value Type of the Map.
Text : Output Key Type of the Map.
IntWritable : Output Value Type of the Map.

Then we have the map() function, which has four parametres. 'LongWritable key' is the line number and 'Text value' is the entire line, 'OutputCollector <Text, IntWritable> output' gets the output in key value pair.

Now, what we have to do is convert this line to String type and split using a space (" "), so that we get the individual word from that line.

Then we will be running a 'for' loop, so that we can get the output from the map in key value pair.

output.collect(new Text(word), new IntWritable(1));

The above line says, collect each word(which will be a key) and initialize it to '1'(which is the value).

Relate it with the above example, the code will become more clear.



The Reducer class


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;

class WordCountReducer extends MapReduceBase implements Mapper <Text,IntWritable,Text,IntWritable > {
   public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
   Reporter rep) throws IOException {
    int count = 0;
    for(IntWritable value : values){
     count++;
    }
    output.collect(key, new IntWritable(count));
   }
}

In the above example we have named the reducer class as 'WordCountReducer' which extends MapReduceBase and implements the Reducer <Text,IntWritable,Text,IntWritable> class.Where the generic type parameters of the Reducer represents :


LongWritable : Input Key Type of the Reducer and should be same as the input key type of Map.
Text : Input Value Type of the Reducer and should be same as the input value type of Map.
Text : Output Key Type of the Reducer.
IntWritable : Output Value Type of the Reducer.

Next we have the reduce() function, which has 'Text key' as the first parameter and 'Iterator<IntWritable> values' as the second parameter.

Now, the beauty of Hadoop is, it does not pass all the keys to the reduce() function. But only passes one key to reduce() and the 'Iterator<IntWritable> values' contains all the keys associated to it. i.e. If you remember the example where {In , 1} occurred in Map1, also {In , 1} occurred in Map2. So, 'In' is the key and it's value '1' is present in two places. So, all we have to do is add all the 1's to get the desired word count.

And the same is done in the next step. We have executed a 'for' loop and added all the 1's to get the desired output. i.e. {In, 2}.

So, we have the Mapper and Reducer already defined. Now, we need to have a class which will run this Mapper and Reducer.


The WordCount Class

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.fs.Path;

public class WordCount extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

  JobConf jobConf = new JobConf(WordCount.class);

  jobConf.setMapperClass(WordCountMapper.class);
  jobConf.setReducerClass(WordCountReducer.class);

  jobConf.setMapOutputKeyClass(Text.class);
  jobConf.setMapOutputValueClass(IntWritable.class);

  jobConf.setOutputKeyClass(Text.class);
  jobConf.setOutputValueClass(IntWritable.class);

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));

  JobClient.runJob(jobConf);
  return 0;
  }

  public static void main(String args[])throws Exception {

    int exitCode = ToolRunner.run(new WordCount(), args);
    System.exit(exitCode);
  }
}

In the above example we can see there is a run() method which contains all the necessary classes to run the job. So, we have defined an Object of 'JobConf' class. The 'JobConf' class has the configuration details of the job.


jobConf.setMapperClass(WordCountMapper.class);
jobConf.setReducerClass(WordCountReducer.class);

In the above two lines we are setting the Mapper and Reducer class.


jobConf.setMapOutputKeyClass(Text.class);
jobConf.setMapOutputValueClass(IntWritable.class);

jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);

In the above four lines we are defining the Mapper Output key value class and Reducer Output key value class. i.e. It defines the key and value of the Mapper and Reducer are going to be of what data type?


FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

Above we have set the input path of the file to be parsed(i.e. story.txt) and the output file, which we have defined in the command line and passed as an argument from the main method.


Finally the runJob() method runs the job.

Now, if you come to the main() method, we have used the run(new WordCount(), args) method of ToolRunner class after passing the class name (i.e. WordCount) and the arguments taken by the main method for actual execution.