Pivotal Knowledge Base

Follow

Mapper output key value NullWritable can cause reducer phase to move slowly

Environment

Product Version
PHD 1.1.1
4 Hadoop Worker Nodes
30GB Data with 15 Million Tuples

Purpose

In some cases, you may need mappers to manipulate the data in some way and produce an unsorted single result file. On the surface, this seems like a simple thing to do by implementing a mapper without an output key of NullWritable.While that method that, the reducer phase becomes extremely slow. All we want to do is have the a single reducer concat all the files into a single file with no sort order. The question this article will answer is, why does it take so long for the reducer?

About the MapReduce code examples

The use case to simply apply a random watermark to each tuple in the given 30GB dataset. This is a simple reproduction of a live customer use case. The mapper task will read in a single tuple and output that tuple after appending a random watermark value.

Below is the base WaterMark.class that started it all.

  • Mapper output Key = NullWritable
  • Mapper output Value = Text
  • Reducer output = default
package pivotal.support;

import java.io.IOException;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class WaterMark {
private static final Log LOG = LogFactory.getLog(WaterMark.class);

public static class setWaterMark extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text> {
private int newID;
private String newValue;
private Random randomGenerator = new Random();


@Override
public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {

newID = randomGenerator.nextInt(10000);
newValue = value + ":" + newID;
output.collect(NullWritable.get(), new Text(newValue));
}
}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WaterMark.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {
System.out.println("ARG0=" + otherArgs[0]);
System.err.println("Usage: Watermark <input path> <output path>");
System.exit(-1);
}

conf.setJobName("WaterMark");

FileInputFormat.addInputPath(conf, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(conf, new Path(otherArgs[1]));

conf.setMapperClass(setWaterMark.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);

JobClient.runJob(conf);
}
}

Below is the WaterMakrKey.class used to test the behavioral difference when the mapper output uses a different output key.

  • Mapper output Key = LongWritable
  • Mapper output Value = Text
  • Reducer output = default
package pivotal.support;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Random;

public class WaterMarkKey {
private static final Log LOG = LogFactory.getLog(WaterMarkKey.class);

public static class setWaterMarkMap extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
private int newID;
private String newValue;
private Random randomGenerator = new Random();


@Override
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {

newID = randomGenerator.nextInt(10000);
newValue = value + ":" + newID;
output.collect(new LongWritable(newID), new Text(newValue));
}

}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WaterMarkKey.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {
System.out.println("ARG0=" + otherArgs[0]);
System.err.println("Usage: Watermark <input path> <output path>");
System.exit(-1);
}

conf.setJobName("WaterMark");

FileInputFormat.addInputPath(conf, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(conf, new Path(otherArgs[1]));

conf.setMapperClass(setWaterMarkMap.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);

JobClient.runJob(conf);
}
}

The WaterMarkMerge.class below is designed to be run without a reducer. The main class handles the merge using FileUtil.CopyMerge which will aggregate all the mapper output into a single file. This class is used as a control because it represents how much time it takes for the mappers to complete their work. Though this solution is effective, it may not fit most use cases and does not represent the point of this article. 

  • Mapper Output Key = NullWritable
  • Mapper Output Value = Text
  • No reducer
package pivotal.support;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Random;


public class WaterMarkMerge {
private static final Log LOG = LogFactory.getLog(WaterMarkMerge.class);

public static class setWaterMark extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text> {
private int newID;
private String newValue;
private Random randomGenerator = new Random();


@Override
public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {

newID = randomGenerator.nextInt(10000);
newValue = value + ":" + newID;
output.collect(NullWritable.get(), new Text(newValue));
}
}

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WaterMarkMerge.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {
System.out.println("ARG0=" + otherArgs[0]);
System.err.println("Usage: Watermark <input path> <output path>");
System.exit(-1);
}

conf.setJobName("WaterMark");

FileInputFormat.addInputPath(conf, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(conf, new Path(otherArgs[1]));

conf.setMapperClass(setWaterMark.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);

JobClient.runJob(conf);


String srcPath = otherArgs[1];
String dstPath = otherArgs[1] + "/merged";
FileSystem hdfs = FileSystem.get(conf);
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, conf, null);
hdfs.close();
}
}

Test Cases

Test # 1 2 3 4 5 6
CLASS WaterMark.class WaterMark.class WaterMark.class WaterMarkMerge.class WaterMarkKey.class WaterMarkKey.class
Number of Reducers 1 4 8 0 1 4
Map out Key DataType NullWritable NullWritable NullWritable NullWritable LongWritable LongWritable
Total Time 7min 42sec 7min 42sec  10 min 39sec  2min 56sec 7min 46sec 3min 1sec

Test #4 is the control test and shows clearly that the above test of the NullWritable mapper output key is having a negative impact to the total job run time.Increasing the number of reducers either has no or negative impact. 

In contrast, if we make the mapper output a random key of LongWritable ( Test #5 ) and the reduce output the NullWritable Key, it seems the problem lies in the shuffle and sort phase. Naturally, adding more reducers ( Test #6 ) decreases the run time significantly.

Crack open the org.apache.hadoop.io.NullWritable.java 

The hashCode function is typically used by the MapReduce default partitioner to generate a hash of the Mappers outputted Key. This is primarily for the shuffle phase and used to make sure all the same keys go to one reducer. Since this function is dispensing zero for all keys, all data will go to a single reducer.

@Override
  public int hashCode() { return 0; }

This is evident based on the output of test case #3. Only the first reducer received the data and all the rest of the reducers started and stopped a JVM process.

[gpadmin@hdm1 danl]$ hdfs dfs -ls /mapreduce/64683144/out-nokey
Found 9 items
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:11 /mapreduce/64683144/out-nokey/_SUCCESS
-rw-r--r--   1 gpadmin hadoop 35236660400 2014-07-31 16:11 /mapreduce/64683144/out-nokey/part-00000
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00001
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00002
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00003
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00004
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00005
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00006
-rw-r--r--   1 gpadmin hadoop           0 2014-07-31 16:01 /mapreduce/64683144/out-nokey/part-00007

So, why does the take so long to sort values with the same key?

The reason is MapReduce uses the Quicksort algorithm to sort all of the data and as described on Wikipedia, having all of the keys the same is the worst case scenario for the Quicksort.  

Conclusion

Do not ever use NullWritable or the same key for all records as the mapper key output if you implement a custom reducer or default reducer. In some cases, the reducer phase can take up to 2 days without progressing.   

Comments

  • Avatar
    Kyle Roberts

    Very helpful. Thanks

Powered by Zendesk