Map-Reduce is a programming model or concept which helps in implementing and processing large scale data sets. MapReduce model consists of two functions map() and reduce(). The map() function is applied to all input items in the input data set converting them to a set of key-value pairs. Multiple map jobs can be run in parallel over different processing nodes to process large data sets.
Map (K1, V1) = List (K2, V2).
The list of processed pairs (K2,V2) is collected by the MapReduce framework. All pairs with the same key are grouped together, creating one group for each one of the generated keys. The reduce function is then applied to each group in parallel to produce a collection of "similar" values.
Reduce (K2, List(V2)) = List (V3)
Since both map and reduce jobs can be run in parallel, they could be distributed over a large number of processing nodes to process large data sets.
Lets write a simple program to count the number of characters in each string using the MapReduce concept but without using any framework. Our program here would create some random strings and use MapReduce concept to distribute the task over a number of threads and eventually output the total number of characters.
// MyMapReduce.java import java.util.*; public class MyMapReduce { List<List<String>> buckets = new ArrayList<List<String>>(); List<String> intermediateresults = new ArrayList<String>(); List<String> values = new ArrayList<String>(); public void init() { for(int i = 1; i<=30; i++) { values.add("zyx" + new Integer(i).toString()); } System.out.println("**Running Conversion into Buckets**\n"); //convert the input data in smaller chunks. Here dividing 30 strings into chunks of 6 chunks of 5. List b = step1ConvertIntoBuckets(values,5); System.out.println("*DONE*\n"); System.out.println("**Running #Map Function# concurrently for all Buckets\n"); List res = step2RunMapFunctionForAllBuckets(b); System.out.println("*MAP Done*\n"); System.out.println("**Running #Reduce Function# for collating Intermediate Results and Printing Results\n"); step3RunReduceFunctionForAllBuckets(res); System.out.println("*REDUCE Done*\n"); } public List step1ConvertIntoBuckets(List list,int numberofbuckets) { int n = list.size(); int m = n / numberofbuckets; int rem = n% numberofbuckets; int count = 0; System.out.println("BUCKETS"); for(int j =1; j<= numberofbuckets; j++) { List<String> temp = new ArrayList<String>(); for(int i=1; i<= m; i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); temp = new ArrayList<String>(); } if(rem != 0) { List<String> temp = new ArrayList<String>(); for(int i =1; i<=rem;i++) { temp.add((String)values.get(count)); count++; } buckets.add(temp); } System.out.println(buckets); return buckets; } public List step2RunMapFunctionForAllBuckets(List list) { for(int i=0; i< list.size(); i++) { List<String> elementList = (ArrayList)list.get(i); new StartThread(elementList).start(); } try { Thread.currentThread().sleep(1000); }catch(Exception e) { } return intermediateresults; } public void step3RunReduceFunctionForAllBuckets(List list) { int sum =0; for(int i=0; i< list.size(); i++) { //you can do some processing here, like finding max of all results etc int t = Integer.parseInt((String)list.get(i)); sum += t; } System.out.println("\nTotal Count is "+ sum+"\n"); } class StartThread extends Thread { private List<String> tempList = new ArrayList<String>(); public StartThread(List<String> list) { tempList = list; } public void run() { System.out.println("In Map..."); for (String str : tempList) { synchronized(this) { intermediateresults.add(new Integer(str.length()).toString()); } } } } public static void main(String[] args) { MyMapReduce my = new MyMapReduce(); my.init(); } }
In this program, you have an arraylist of 30 strings. It is being divided into 5 batches of 6 strings. Each Batch is run in parallel in a thread and the processed information is collected in an intermediate arraylist. The reduce function is then called which loops through the intermediate arraylist and sums up the counts to output the information. Here only the Map function is being distributed over multiple threads. The reduce simply loops through the output of map and sums up the information.
Compile and run the program - output is :
$ java MyMapReduce **Running Conversion into Buckets** BUCKETS [[zyx1, zyx2, zyx3, zyx4, zyx5, zyx6], [zyx7, zyx8, zyx9, zyx10, zyx11, zyx12], [zyx13, zyx14, zyx15, zyx16, zyx17, zyx18], [zyx19, zyx20, zyx21, zyx22, zyx23, zyx24], [zyx25, zyx26, zyx27, zyx28, zyx29, zyx30]] *DONE* **Running #Map Function# concurrently for all Buckets In Map... In Map... In Map... In Map... In Map... *MAP Done* **Running #Reduce Function# for collating Intermediate Results and Printing Results Total Count is 141 *REDUCE Done*
A mapreduce framework like hadoop provides the distributed setup to run such mapreduce programs over a large number of processing nodes. It can automatically handle the spawning of processing threads and store the intermediatory information in some temporary file among its nodes.
Pre-requisite : setup hadoop on multiple nodes refer - http://jayant7k.blogspot.com/2008/10/setting-up-hadoop.html
Here is the same code written in the hadoop 0.20.1 map-reduce framework
import java.io.*; import java.util.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount extends Configured implements Tool { public static class MapClass extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("wordcount [-r <reduces>] <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WordCount example for hadoop 0.20.1"); job.setJarByClass(WordCount.class); job.setMapperClass(MapClass.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); // the keys are words (strings) job.setOutputKeyClass(Text.class); // the values are counts (ints) job.setOutputValueClass(IntWritable.class); List<String> other_args = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { // The number of map tasks was earlier configurable, // But with hadoop 0.20.1, it is decided by the framework. // Since this heavily depends on the input data size and how it is being split. if ("-r".equals(args[i])) { job.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.addInputPath(job, new Path(other_args.get(0))); FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); //submit job and wait for completion. Also show output to user. job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }Compile, create jar and run the file
$ javac -classpath ../hadoop-0.20.1/hadoop-0.20.1-core.jar -d wordcount_classes/ WordCount.java $ jar -cvf wordcount.jar -C wordcount_classes/ . $ cd ../hadoop-0.20.1 $ ./bin/hadoop jar ../progs/wordcount.jar WordCount -r 2 /user/hadoop/input /user/hadoop/output 10/06/16 17:00:47 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 10/06/16 17:00:48 INFO input.FileInputFormat: Total input paths to process : 2 10/06/16 17:00:48 INFO mapred.JobClient: Running job: job_201006141203_0008 10/06/16 17:00:49 INFO mapred.JobClient: map 0% reduce 0% 10/06/16 17:00:58 INFO mapred.JobClient: map 100% reduce 0% 10/06/16 17:01:10 INFO mapred.JobClient: map 100% reduce 100% 10/06/16 17:01:12 INFO mapred.JobClient: Job complete: job_201006141203_0008 10/06/16 17:01:12 INFO mapred.JobClient: Counters: 17 10/06/16 17:01:12 INFO mapred.JobClient: Job Counters 10/06/16 17:01:12 INFO mapred.JobClient: Launched reduce tasks=2 10/06/16 17:01:12 INFO mapred.JobClient: Launched map tasks=2 10/06/16 17:01:12 INFO mapred.JobClient: Data-local map tasks=2 10/06/16 17:01:12 INFO mapred.JobClient: FileSystemCounters 10/06/16 17:01:12 INFO mapred.JobClient: FILE_BYTES_READ=2009 10/06/16 17:01:12 INFO mapred.JobClient: HDFS_BYTES_READ=1467 10/06/16 17:01:12 INFO mapred.JobClient: FILE_BYTES_WRITTEN=4142 10/06/16 17:01:12 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1356 10/06/16 17:01:12 INFO mapred.JobClient: Map-Reduce Framework 10/06/16 17:01:12 INFO mapred.JobClient: Reduce input groups=0 10/06/16 17:01:12 INFO mapred.JobClient: Combine output records=142 10/06/16 17:01:12 INFO mapred.JobClient: Map input records=33 10/06/16 17:01:12 INFO mapred.JobClient: Reduce shuffle bytes=2021 10/06/16 17:01:12 INFO mapred.JobClient: Reduce output records=0 10/06/16 17:01:12 INFO mapred.JobClient: Spilled Records=284 10/06/16 17:01:12 INFO mapred.JobClient: Map output bytes=2200 10/06/16 17:01:12 INFO mapred.JobClient: Combine input records=190 10/06/16 17:01:12 INFO mapred.JobClient: Map output records=190 10/06/16 17:01:12 INFO mapred.JobClient: Reduce input records=142Here is an explanation of the program
Configuration is the configuration which is replicated at all nodes in HDFS. All HDFS nodes run on their own JVM. JobTracker runs on master and accepts jobs from clients. It distributes the job into parts and sends the parts over to all the nodes via TaskTracker. taskTracker communicates with the jobtracker and keeps on asking for more job to run - as and when it finishes its task. Tasktracker can run multiple instances for multiple tasks. You set the mapper class and the reducer class in job.
job.setMapperClass();
job.setReducerClass();
You can also specify the input and output paths in the job
FileInputFormat.addInputPath(job, Path);
FileOutputFormat.setOutputPath(job, Path);
You can also set a number of other options in the Job.
job.setNumReduceTasks(); //set number of reduce tasks
job.setOutputFormat(); //set the output format
Job is submitted by calling either of.
job.submit(); //send the job to the cluster and return - non blocking
job.WaitForCompletion(); // send the job to the cluster and wait for its completion
Job determines the proper division of input into parts and sends job data to master jobtracker server. Tasktracker asks the jobtracker for its inputsplit data and the job to run. It calls mapper for each record received from the inputSplit. Mapper should extend the mapreduce.Mapper class which has code for some of the required functions of mapper. All mappers run separately on each node inside the tasktracker running on separate instances of jvm. There is no sharing of data. If you set a static variable in one mapper, you will not get its value in another mapper on another tasktracker.
Map(Object key, Object value, Context context)
To allow serialization and transfer of all types of data, java defines its own writable class. These box classes like Text (for String), IntWritable (for integers), LongWritable (for long) are instances of base class Writable (for values), and instances of WritableComparable (for Keys). Context is used to collect and write the ouput into intermediate as well as final files.
context.write() takes (Key,Value)
Partitioner is used to get the partition number for a given key. By default HashPartitioner is used which uses key.hashCode() to return the partition number. You can use job to specify custo
m partitioner.
Reducer(Object Key, Iterator Values, Context context)
Keys and values sent to one partition all go to the same reduce task. Reduce calls are sorted by key. Reducer sends the data to the recordwriter which writes the data to a output file.
Hadoop provides lots of flexibility to override the components and customize the input and output data.
HadoopStreaming can be used to interface the hadoop map-reduce framework with arbitary program code. It uses stdin and stdout for data flow. You can define a separate program for each of map
per and reducer in any language of your choice.
3 comments:
I have a quick question for you. For some reason, my hadoop 0.21.0 installation does not have the hadoop-0.21.0-core.jar file. This file is not in the download from apache. Do you know how I can create this?
Thank you, Jason
You can compile the jar file from source using ant
Hi Jayant
you mean to say that i can write my map ,reduce program in any language for exmaple PL/SQL and then run in my Hadoop setup using Hadoop streaming ?? if yes what all it required, how to do it.any sample code or pointer to it.
Please let me know.
-ajay
Post a Comment