Wednesday, June 16, 2010

Writing your first map-reduce program on hadoop

Before we go ahead with the actual program, lets have a look at what map-reduce is and its usage.

Map-Reduce is a programming model or concept which helps in implementing and processing large scale data sets. MapReduce model consists of two functions map() and reduce(). The map() function is applied to all input items in the input data set converting them to a set of key-value pairs. Multiple map jobs can be run in parallel over different processing nodes to process large data sets.

Map (K1, V1) = List (K2, V2).

The list of processed pairs (K2,V2) is collected by the MapReduce framework. All pairs with the same key are grouped together, creating one group for each one of the generated keys. The reduce function is then applied to each group in parallel to produce a collection of "similar" values.

Reduce (K2, List(V2)) = List (V3)

Since both map and reduce jobs can be run in parallel, they could be distributed over a large number of processing nodes to process large data sets.

Lets write a simple program to count the number of characters in each string using the MapReduce concept but without using any framework. Our program here would create some random strings and use MapReduce concept to distribute the task over a number of threads and eventually output the total number of characters.

// MyMapReduce.java
import java.util.*;

public class MyMapReduce
{
  List<List<String>> buckets = new ArrayList<List<String>>();
  List<String> intermediateresults = new ArrayList<String>();
  List<String> values = new ArrayList<String>();

  public void init()
  {
    for(int i = 1; i<=30; i++)
    {
      values.add("zyx" + new Integer(i).toString());
    }
    System.out.println("**Running Conversion into Buckets**\n");
    //convert the input data in smaller chunks. Here dividing 30 strings into chunks of 6 chunks of 5.
    List b = step1ConvertIntoBuckets(values,5);
    System.out.println("*DONE*\n");
    System.out.println("**Running #Map Function# concurrently for all Buckets\n");
    List res = step2RunMapFunctionForAllBuckets(b);
    System.out.println("*MAP Done*\n");
    System.out.println("**Running #Reduce Function# for collating Intermediate Results and Printing Results\n");
    step3RunReduceFunctionForAllBuckets(res);
    System.out.println("*REDUCE Done*\n");
  }
  public List step1ConvertIntoBuckets(List list,int numberofbuckets)
  {
    int n = list.size();
    int m = n / numberofbuckets;
    int rem = n% numberofbuckets;

    int count = 0;
    System.out.println("BUCKETS");
    for(int j =1; j<= numberofbuckets; j++)
    {
      List<String> temp = new ArrayList<String>();
      for(int i=1; i<= m; i++)
      {
        temp.add((String)values.get(count));
        count++;
      }
      buckets.add(temp);
      temp = new ArrayList<String>();
    }
    if(rem != 0)
    {
      List<String> temp = new ArrayList<String>();
      for(int i =1; i<=rem;i++)
      {
        temp.add((String)values.get(count));
        count++;
      }
      buckets.add(temp);
    }
    System.out.println(buckets);
    return buckets;
  }

  public List step2RunMapFunctionForAllBuckets(List list)
  {
    for(int i=0; i< list.size(); i++)
    {
      List<String> elementList = (ArrayList)list.get(i);
      new StartThread(elementList).start();
    }
    try
    {
      Thread.currentThread().sleep(1000);
    }catch(Exception e)
    {    }
    return intermediateresults;
  }

  public void step3RunReduceFunctionForAllBuckets(List list)
  {
    int sum =0;
    for(int i=0; i< list.size(); i++)
    {
      //you can do some processing here, like finding max of all results etc
      int t = Integer.parseInt((String)list.get(i));
      sum += t;
    }
    System.out.println("\nTotal Count is "+ sum+"\n");
  }

  class StartThread extends Thread
  {
    private List<String> tempList = new ArrayList<String>();
    public StartThread(List<String> list)
    {
      tempList = list;
    }
    public void run()
    {
      System.out.println("In Map...");
      for (String str : tempList)
      {
        synchronized(this)
        {
          intermediateresults.add(new Integer(str.length()).toString());
        }
      }
    }
  }
  public static void main(String[] args)
  {
    MyMapReduce my = new MyMapReduce();
    my.init();
  }
}

In this program, you have an arraylist of 30 strings. It is being divided into 5 batches of 6 strings. Each Batch is run in parallel in a thread and the processed information is collected in an intermediate arraylist. The reduce function is then called which loops through the intermediate arraylist and sums up the counts to output the information. Here only the Map function is being distributed over multiple threads. The reduce simply loops through the output of map and sums up the information.

Compile and run the program - output is :

$ java MyMapReduce 
**Running Conversion into Buckets**

BUCKETS
[[zyx1, zyx2, zyx3, zyx4, zyx5, zyx6], [zyx7, zyx8, zyx9, zyx10, zyx11, zyx12], [zyx13, zyx14, zyx15, zyx16, zyx17, zyx18], [zyx19, zyx20, zyx21, zyx22, zyx23, zyx24], [zyx25, zyx26, zyx27, zyx28, zyx29, zyx30]]
*DONE*

**Running #Map Function# concurrently for all Buckets

In Map...
In Map...
In Map...
In Map...
In Map...
*MAP Done*

**Running #Reduce Function# for collating Intermediate Results and Printing Results


Total Count is 141

*REDUCE Done*

A mapreduce framework like hadoop provides the distributed setup to run such mapreduce programs over a large number of processing nodes. It can automatically handle the spawning of processing threads and store the intermediatory information in some temporary file among its nodes.

Pre-requisite : setup hadoop on multiple nodes refer - http://jayant7k.blogspot.com/2008/10/setting-up-hadoop.html

Here is the same code written in the hadoop 0.20.1 map-reduce framework

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount extends Configured implements Tool {

  public static class MapClass extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }

  static int printUsage() {
    System.out.println("wordcount [-r <reduces>] <input> <output>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "WordCount example for hadoop 0.20.1");

    job.setJarByClass(WordCount.class);
    job.setMapperClass(MapClass.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);

    // the keys are words (strings)
    job.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    job.setOutputValueClass(IntWritable.class);


    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        // The number of map tasks was earlier configurable, 
        // But with hadoop 0.20.1, it is decided by the framework.
        // Since this heavily depends on the input data size and how it is being split.
        if ("-r".equals(args[i])) {
          job.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
            args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2.");
      return printUsage();
    }
    FileInputFormat.addInputPath(job, new Path(other_args.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));

    //submit job and wait for completion. Also show output to user.
    job.waitForCompletion(true);
    return 0;
  }
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
  }
}
Compile, create jar and run the file
$ javac -classpath ../hadoop-0.20.1/hadoop-0.20.1-core.jar -d wordcount_classes/ WordCount.java
$ jar -cvf wordcount.jar -C wordcount_classes/ .
$ cd ../hadoop-0.20.1
$ ./bin/hadoop jar ../progs/wordcount.jar WordCount -r 2 /user/hadoop/input /user/hadoop/output

10/06/16 17:00:47 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
10/06/16 17:00:48 INFO input.FileInputFormat: Total input paths to process : 2
10/06/16 17:00:48 INFO mapred.JobClient: Running job: job_201006141203_0008
10/06/16 17:00:49 INFO mapred.JobClient:  map 0% reduce 0%
10/06/16 17:00:58 INFO mapred.JobClient:  map 100% reduce 0%
10/06/16 17:01:10 INFO mapred.JobClient:  map 100% reduce 100%
10/06/16 17:01:12 INFO mapred.JobClient: Job complete: job_201006141203_0008
10/06/16 17:01:12 INFO mapred.JobClient: Counters: 17
10/06/16 17:01:12 INFO mapred.JobClient:   Job Counters 
10/06/16 17:01:12 INFO mapred.JobClient:     Launched reduce tasks=2
10/06/16 17:01:12 INFO mapred.JobClient:     Launched map tasks=2
10/06/16 17:01:12 INFO mapred.JobClient:     Data-local map tasks=2
10/06/16 17:01:12 INFO mapred.JobClient:   FileSystemCounters
10/06/16 17:01:12 INFO mapred.JobClient:     FILE_BYTES_READ=2009
10/06/16 17:01:12 INFO mapred.JobClient:     HDFS_BYTES_READ=1467
10/06/16 17:01:12 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4142
10/06/16 17:01:12 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1356
10/06/16 17:01:12 INFO mapred.JobClient:   Map-Reduce Framework
10/06/16 17:01:12 INFO mapred.JobClient:     Reduce input groups=0
10/06/16 17:01:12 INFO mapred.JobClient:     Combine output records=142
10/06/16 17:01:12 INFO mapred.JobClient:     Map input records=33
10/06/16 17:01:12 INFO mapred.JobClient:     Reduce shuffle bytes=2021
10/06/16 17:01:12 INFO mapred.JobClient:     Reduce output records=0
10/06/16 17:01:12 INFO mapred.JobClient:     Spilled Records=284
10/06/16 17:01:12 INFO mapred.JobClient:     Map output bytes=2200
10/06/16 17:01:12 INFO mapred.JobClient:     Combine input records=190
10/06/16 17:01:12 INFO mapred.JobClient:     Map output records=190
10/06/16 17:01:12 INFO mapred.JobClient:     Reduce input records=142
Here is an explanation of the program

Configuration is the configuration which is replicated at all nodes in HDFS. All HDFS nodes run on their own JVM. JobTracker runs on master and accepts jobs from clients. It distributes the job into parts and sends the parts over to all the nodes via TaskTracker. taskTracker communicates with the jobtracker and keeps on asking for more job to run - as and when it finishes its task. Tasktracker can run multiple instances for multiple tasks. You set the mapper class and the reducer class in job.

job.setMapperClass();
job.setReducerClass();

You can also specify the input and output paths in the job

FileInputFormat.addInputPath(job, Path);
FileOutputFormat.setOutputPath(job, Path);

You can also set a number of other options in the Job.

job.setNumReduceTasks(); //set number of reduce tasks
job.setOutputFormat(); //set the output format

Job is submitted by calling either of.

job.submit(); //send the job to the cluster and return - non blocking
job.WaitForCompletion(); // send the job to the cluster and wait for its completion

Job determines the proper division of input into parts and sends job data to master jobtracker server. Tasktracker asks the jobtracker for its inputsplit data and the job to run. It calls mapper for each record received from the inputSplit. Mapper should extend the mapreduce.Mapper class which has code for some of the required functions of mapper. All mappers run separately on each node inside the tasktracker running on separate instances of jvm. There is no sharing of data. If you set a static variable in one mapper, you will not get its value in another mapper on another tasktracker.

Map(Object key, Object value, Context context)

To allow serialization and transfer of all types of data, java defines its own writable class. These box classes like Text (for String), IntWritable (for integers), LongWritable (for long) are instances of base class Writable (for values), and instances of WritableComparable (for Keys). Context is used to collect and write the ouput into intermediate as well as final files.

context.write() takes (Key,Value)

Partitioner is used to get the partition number for a given key. By default HashPartitioner is used which uses key.hashCode() to return the partition number. You can use job to specify custo
m partitioner.

Reducer(Object Key, Iterator Values, Context context)

Keys and values sent to one partition all go to the same reduce task. Reduce calls are sorted by key. Reducer sends the data to the recordwriter which writes the data to a output file.

Hadoop provides lots of flexibility to override the components and customize the input and output data.

HadoopStreaming can be used to interface the hadoop map-reduce framework with arbitary program code. It uses stdin and stdout for data flow. You can define a separate program for each of map
per and reducer in any language of your choice.

3 comments:

Jason Williams said...

I have a quick question for you. For some reason, my hadoop 0.21.0 installation does not have the hadoop-0.21.0-core.jar file. This file is not in the download from apache. Do you know how I can create this?

Thank you, Jason

gamegeek said...

You can compile the jar file from source using ant

Anonymous said...

Hi Jayant
you mean to say that i can write my map ,reduce program in any language for exmaple PL/SQL and then run in my Hadoop setup using Hadoop streaming ?? if yes what all it required, how to do it.any sample code or pointer to it.
Please let me know.

-ajay