MapReduce Framework

MapReduce is a framework in which user has to fit their solution into the framework of map and reduce, which in some situations might be challenging. MapReduce is similar to recursion for the first time: it is challenging to find the recursive solution to the problem, but when it comes to you, it is clear, concise, and elegant. In many situations you have to be conscious of system resources being used by the MapReduce job, especially inter-cluster network utilization. The tradeoff of being confined to the MapReduce framework is the ability to process your data with distributed computing, without having to deal with concurrency, robustness, scale, and other common challenges.

The input to a MapReduce job is a set of files in the data store that are spread out over the Hadoop Distributed File System (HDFS). In Hadoop, these files are split with an input format, which defines how to separate a file into input splits. An input split is a byte-oriented view of a chunk of the file to be loaded by a map task.

Each map task in Hadoop is broken into the following phases: record reader, mapper, combiner, and partitioner. The output of the map tasks, called the intermediate keys and values, are sent to the reducers. The reduce tasks are broken into the following phases: shuffle, sort, reducer, and output format. The nodes in which the map tasks run are optimally on the nodes in which the data rests. This way, the data typically does not have to move over the network and can be computed on the local machine.

  • record reader – The record reader translates an input split generated by input format into records. The purpose of the record reader is to parse the data into records, but not parse the record itself. It passes the data to the mapper in the form of a key/value pair. Usually the key in this context is positional information and the value is the chunk of data that composes a record.
  • map – In the mapper, user-provided code is executed on each key/value pair from the record reader to produce zero or more new key/value pairs, called the intermediate pairs. The decision of what is the key and value here is not arbitrary and is very important to what the MapReduce job is accomplishing. The key is what the data will be grouped on and the value is the information pertinent to the analysis in the reducer.
  • combiner – The combiner, an optional localized reducer, can group data in the map phase. It takes the intermediate keys from the mapper and applies a user-provided method to aggregate values in the small scope of that one mapper. For example, because the count of an aggregation is the sum of the counts of each part, you can produce an intermediate count and then sum those intermediate counts for the final result. In many situations, this significantly reduces the amount of data that has to move over the network. Sending (hello world, 3) requires fewer bytes than sending (hello world, 1) three times over the network. Combiners will be covered in more depth with the patterns that use them extensively. Combiners often provide extreme performance gains with no downside. A combiner is not guaranteed to execute, so it cannot be a part of the overall algorithm.
  • partitioner – The partitioner takes the intermediate key/value pairs from the mapper (or combiner if it is being used) and splits them up into shards, one shard per reducer. By default, the partitioner interrogates the object for its hash code, which is typically an md5sum. Then, the partitioner performs a modulus operation by the number of reducers: key.hashCode() % (number of reducers). This randomly distributes the keyspace evenly over the reducers, but still ensures that keys with the same value in different mappers end up at the same reducer. The default behavior of the partitioner can be customized, and will be in some more advanced patterns, such as sorting. However, changing the partitioner is rarely necessary. The partitioned data is written to the local file system for each map task and waits to be pulled by its respective reducer.
  • shuffle and sort – The reduce task starts with the shuffle and sort step. This step takes the output files written by all of the partitioners and downloads them to the local machine in which the reducer is running. These individual data pieces are then sorted by key into one larger data list. The purpose of this sort is to group equivalent keys together so that their values can be iterated over easily in the reduce task. This phase is not customizable and the framework handles everything automatically. The only control a developer has is how the keys are sorted and grouped by specifying a custom Comparator object.
  • reduce – The reducer takes the grouped data as input and runs a reduce function once per key grouping. The function is passed the key and an iterator over all of the values associated with that key. A wide range of processing can happen in this function, as we’ll see in many of our patterns. The data can be aggregated, filtered, and combined in a number of ways. Once the reduce function is done, it sends zero or more key/value pair to the final step, the output format. Like the map function, the reduce function will change from job to job since it is a core piece of logic in the solution.
  • output format – The output format translates the final key/value pair from the reduce function and writes it out to a file by a record writer. By default, it will separate the key and value with a tab and separate records with a newline character. This can typically be customized to provide richer output formats, but in the end, the data is written out to HDFS, regardless of format.

Example

The “Word Count” program is the canonical example in MapReduce, and for good reason. It is a straightforward application of MapReduce and MapReduce can handle it extremely efficiently. In this particular example, we’re going to be doing a word count over user-submitted comments on StackOverflow. The content of the Text field will be pulled out and pre-processed a bit, and then we’ll count up how many times we see each word. An example record from this data set is

<row Id=”8189677″ PostId=”6881722″ Text=”Have you looked at Hadoop?”

CreationDate=”2011-07-30T07:29:33.343″ UserId=”831878″ />

This record is the 8,189,677th comment on Stack Overflow, and is associated with post number 6,881,722, and is by user number 831,878. The number of the PostId and the UserId are foreign keys to other portions of the data set. We’ll show how to join these datasets together in the chapter on join patterns.

The first chunk of code we’ll look at is the driver. The driver takes all of the components that we’ve built for our MapReduce job and pieces them together to be submitted to execution. This code is usually pretty generic and considered “boiler plate.” You’ll find that in all of our patterns the driver stays the same for the most part. This code is from the “Word Count” example that ships with Hadoop Core

import java.io.IOException;

import java.util.StringTokenizer;

import java.util.Map;

import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.commons.lang.StringEscapeUtils;

public class CommentWordCount {

public static class WordCountMapper

extends Mapper<Object, Text, Text, IntWritable> {

}

public static class IntSumReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs =

new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println(“Usage: CommentWordCount <in> <out>”);

System.exit(2);

}

Job job = new Job(conf, “StackOverflow Comment Word Count”);

job.setJarByClass(CommentWordCount.class);

job.setMapperClass(WordCountMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

The purpose of the driver is to orchestrate the jobs. The first few lines of main are all about parsing command line arguments. Then we start setting up the job object by telling it what classes to use for computations and what input paths and output paths to use. That’s about it! It’s just important to make sure the class names match up with the classes you wrote and that the output key and value types match up with the output types of the mapper.

In some cases, the combiner simply cannot be used due to the nature of the reducer. In other cases, the combiner class will be different from the reducer class. The combiner is very effective in the “Word Count” program and is quite simple to activate.

Next is the mapper code that parses and prepares the text. Once some of the punctuation and random text is cleaned up, the text string is split up into a list of words. Then the intermediate key produced is the word and the value produced is simply “1.” This means we’ve seen this word once. Even if we see the same word twice in one line, we’ll output the word and “1” twice and it’ll be taken care of in the end. Eventually, all of these ones will be summed together into the global count of that word.

public static class WordCountMapper

extends Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

// Parse the input string into a nice map

Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

// Grab the “Text” field, since that is what we are counting over

String txt = parsed.get(“Text”);

// .get will return null if the key is not there

if (txt == null) {

// skip this record

return;

}

// Unescape the HTML because the data is escaped.

txt = StringEscapeUtils.unescapeHtml(txt.toLowerCase());

// Remove some annoying punctuation

txt = txt.replaceAll(“‘”, “”); // remove single quotes (e.g., can’t)

txt = txt.replaceAll(“[^a-zA-Z]”, ” “); // replace the rest with a space

// Tokenize the string by splitting it up on whitespace into

// something we can iterate over,

// then send the tokens away

StringTokenizer itr = new StringTokenizer(txt);

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

The first function, MRDPUtils.transformXmlToMap, is a helper function to parse a line of Stack Overflow data in a generic manner. You’ll see it used in a number of our examples. It basically takes a line of the StackOverflow XML (which has a very predictable format) and matches up the XML attributes with the values into a Map.

Next, turn your attention to the WordCountMapper class. This code is a bit more complicated than the driver (for good reason!). The mapper is where we’ll see most of the work done. The first major thing to notice is the type of the parent class:

Mapper<Object, Text, Text, IntWritable>

They map to the types of the input key, input value, output key, and output value, respectively. We don’t care about the key of the input in this case, so that’s why we use Object. The data coming in is Text (Hadoop’s special String type) because we are reading the data as a line-by-line text document. Our output key and value are Text and IntWritable because we will be using the word as the key and the count as the value.

The mapper input key and value data types are dictated by the job’s configured FileInputFormat. The default implementation is the TextInputFormat, which provides the number of bytes read so far in the file as the key in a LongWritable object and the line of text as the value in a Text object. These key/value data types are likely to change if you are using different input formats.

Up until we start using the StringTokenizer towards the bottom of the code, we’re just cleaning up the string. We unescape the data because the string was stored in an escaped manner so that it wouldn’t mess up XML parsing. Next, we remove any stray punctuation so that the literal string Hadoop! is considered the same word as Hadoop? and Hadoop. Finally, for each token (i.e., word) we emit the word with the number 1, which means we saw the word once. The framework then takes over to shuffle and sorts the key/value pairs to reduce tasks.

Finally comes the reducer code, which is relatively simple. The reduce function gets called once per key grouping, in this case each word. We’ll iterate through the values, which will be numbers, and take a running sum. The final value of this running sum will be the sum of the ones.

public static class IntSumReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

As in the mapper, we specify the input and output types via the template parent class. Also like the mapper, the types correspond to the same things: input key, input value, output key, and output value. The input key and input value data types must match the output key/value types from the mapper. The output key and output value data types must match the types that the job’s configured FileOutputFormat is expecting. In this case, we are using the default TextOutputFormat, which can take any two Writable objects as output.

The reduce function has a different signature from map, though: it gives you an Iterator over values instead of just a single value. This is because you are now iterating over all values that have that key, instead of just one at a time. The key is very important in the reducer of pretty much every MapReduce job, unlike the input key in the map.

Anything we pass to context.write will get written out to a file. Each reducer will create one file, so if you want to coalesce them together you’ll have to write a post-processing step to concatenate them.

Get industry recognized certification – Contact us

Menu