Hadoop: Get a callback on MapReduce job completion

MapReduce jobs might take a long time to complete… That’s said, you might have to run your jobs in background, right ? You could have a look at Job tracker URL (for MR V1) or Yarn Resource manager (V2) in order to check job completion, but what if you could be notified once job is completed ?

A quick and dirty solution would be to poll JobTracker every X mn as follows


user@hadoop ~ $ hadoop job -status job_201211261732_3134
Job: job_201211261732_3134
file: hdfs://user/lihdop/.staging/job_201211261732_3134/job.xml
tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201211261732_3134
map() completion: 0.0
reduce() completion: 0.0

Working in a support position, I just hate such approach. Getting Cronjobs and deamons for that purpose is always a pain to troubleshoot, always a pain to understand where / why these damned processes did not wake up in time !

Getting a notification instead of polling ? Definitely more elegant…

In your driver class, only 3 lines would enable the callback feature of Hadoop



public class CallBackMR extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        CallBackMR callback = new CallBackMR();
        int res = ToolRunner.run(conf, callback, args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = this.getConf();

        // ==================================
        // Set the callback parameters
        conf.set("job.end.notification.url", "https://hadoopi.wordpress.com/api/hadoop/notification/$jobId?status=$jobStatus");
        conf.setInt("job.end.retry.attempts", 3);
        conf.setInt("job.end.retry.interval", 1000);
        // ==================================

        .../...

        // Submit your job in background
        job.submit();
    }

}


At job completion, an HTTP request will be sent to “job.end.notification.url” value. Can be retrieved from notification URL both the JOB_ID and JOB_STATUS.
Looking at Hadoop server side (see below logs from yarn), a notification SUCCEEDED has been sent every second, max 10 times before it officially failed (The URL I used here was obviously a fake one)


root@hadoopi:/usr/lib/hadoop/logs/userlogs/application_1379509275868_0002# find . -type f | xargs grep hadoopi
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,090 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,864 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:32,965 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:33,871 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:33,971 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:34,804 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:34,904 INFO [Thread-66] org.mortbay.log: Job end notification trying https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:35,584 WARN [Thread-66] org.mortbay.log: Job end notification to https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED failed with code: 404 and message "Not Found"
./container_1379509275868_0002_01_000001/syslog:2013-09-18 15:14:35,684 WARN [Thread-66] org.mortbay.log: Job end notification failed to notify : https://hadoopi.wordpress.com/api/hadoop/notification/job_1379509275868_0002?status=SUCCEEDED


Note that the notification will be triggered for SUCCESS status but also for KILLED or FAILED statuses – that might be quite useful too.
Next is to implement a callback listener on client side…

Cheers,
Antoine

Advertisements

Hadoop: Filter input files used for MapReduce

How to filter out the input files to process with MapReduce, and how to get a code versatile enough so that end user can easily select files based on a common pattern ?
Assuming your files stored on HDFS contain a date-time pattern, what if you could execute your code on “2013-07-XX” files only ? I am sure there are many ways to do so, but because of my Regexp addiction (thanks to my Perl legacy), I have created a MapReduce code that can take as an argument a regular expression in order to filter out any of the input files included in your data set.

how-we-filter

Filtering on file name

Command line interface

As reported in previous article, when implementing the Tool Interface, you can provide Hadoop with some configuration parameters directly from the command line interface (prefixed by -D option). By doing so, no need to deal with all these properties from static main(String[] args) method in your driver code anymore since all of them will be accessible system-wide through Hadoop configuration.

hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.*

Implementing PathFilter interface

PathFilter allows you to accept or reject any of the files included your input path, based on some custom restrictions (in our case a regular expression). This will be achieved by implementing the accept() method. Note that you’ll need to extends Configured class if you want to access “file.pattern” property supplied in CLI.

public class RegexFilter extends Configured implements PathFilter {

	Pattern pattern;
	Configuration conf;

	@Override
	public boolean accept(Path path) {
		Matcher m = pattern.matcher(path.toString());
		System.out.println("Is path : " + path.toString() + " matching "
			+ conf.get("file.pattern") + " ? , " + m.matches());
		return m.matches();
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		pattern = Pattern.compile(conf.get("file.pattern"));
	}
}

In setConf() method, we retrieve the hadoop configuration and compile our regular expression. In accept() method, we return true if file matches our pattern, false otherwise. Pretty obvious, isn’t it ?

Driver code

Using your RegexFilter in MapReduce driver code is pretty straightforward. You need to call setInputPathFilter static method and add your custom PathFilter implementation. Your filter will be applied on each file included in inputPath directory (supplied in addInputPath method).

	// Input
	FileInputFormat.setInputPathFilter(myJob, RegexFilter.class);
	FileInputFormat.addInputPath(myJob, inputPath);
	myJob.setInputFormatClass(TextInputFormat.class);

	// Output
	FileOutputFormat.setOutputPath(myJob, outputPath);
	myJob.setOutputFormatClass(TextOutputFormat.class);

Testing

First test done with a “.*” regular expression (i.e. all files)
Our implementation works

Is path : hdfs://hadub1:8020/user/hadoopi/data matching .* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .* ? , true
13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2
13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008

Second test done with a more restrictive pattern (“.*part.*”)
Our implementation does not work here !

Is path : hdfs://hadub1:8020/user/hadoopi/data matches .*part.* ? , false
13/07/29 10:45:09 INFO mapred.JobClient: Cleaning up the staging area hdfs://hadub1:8020/user/hadoopi/.staging/job_201307291018_0006
13/07/29 10:45:09 ERROR security.UserGroupInformation: PriviledgedActionException as:hadoopi (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)

Root cause is the following: the first call to the PathFilter is actually always the directory itself (/user/hadoopi/data in our case) and since it does not match our pattern, directory will be rejected and exception will be thrown.

PathFilter with directory support

In order to avoid this issue, the following modification must be done in our PathFilter implementation

public class RegexFilter extends Configured implements PathFilter {

	Pattern pattern;
	Configuration conf;
	FileSystem fs;

	@Override
	public boolean accept(Path path) {

		try {
			if (fs.isDirectory(path)) {
				return true;
			} else {
				Matcher m = pattern.matcher(path.toString());
				System.out.println("Is path : " + path.toString() + " matches "
						+ conf.get("file.pattern") + " ? , " + m.matches());
				return m.matches();
			}
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}

	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		if (conf != null) {
			try {
				fs = FileSystem.get(conf);
				pattern = Pattern.compile(conf.get("file.pattern"));
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}

Within setConf() method, we mount our HDFS. On accept() method, we first test whether the supplied path is a directory or not. If so, directory is accepted (return true), else, filename is tested against our regular expression.
Let’s restart our process

Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .*part.* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .*part.* ? , true
13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2
13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008

Well, it works now, you should be able filter out any of your input files based on some regular expressions (do not forget to escape backslashes).

hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.*
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*2013-07-\\d{2}.*
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*part-r-0000[0-1].*

Great, our MapReduce code is now able to filter out any input files based on regular expression.

Bonus track : Filtering on file properties

Let’s filter out new files now, this time not on file’s name anymore, but rather on file properties (e.g. modification time). I want to process every files with a last modification time greater / lower than a supplied value, similarly to Unix commands below

find . -mtime +20 | xargs process

For that purpose I will use same logic as per RegexFilter explained above. If Path is a directory, return true, otherwise test file’s last modification date.
This should accept both negative and positive values. This value will be set as a configuration value (“file.mtime”) from CLI

  • mtime+20 : File(s) modified for more than 20 days
  • mtime-20 : File(s) modified for less than 20 days

The file’s modification time will be retrieved from fs.getFileStatus(path) and will be compared to supplied value and System.getCurrentTime().


public class FileFilter extends Configured implements PathFilter {

	Configuration conf;
	FileSystem fs;

	@Override
	public boolean accept(Path path) {
		try {
			if(fs.isDirectory(path)){
				return true;
			}
		} catch (IOException e1) {
			e1.printStackTrace();
			return false;
		}
		
		if (conf.get("file.mtime") != null) {
			int mTime = 0;
			String strMtime = conf.get("file.mtime");
			mTime = Integer.valueOf(strMtime.substring(1, strMtime.length()));
			try {
				FileStatus file = fs.getFileStatus(path);
				long now = System.currentTimeMillis() / (1000 * 3600 * 24);
				long time = file.getModificationTime() / (1000 * 3600 * 24);
				long lastModifTime = now - time;
				boolean accept;
				if (strMtime.charAt(0) == '-') {
					accept = mTime < lastModifTime ? true : false;
					System.out.println("File " + path.toString() + " modified "
							+ lastModifTime + " days ago, is " + mTime
							+ " lower ? "+accept);
				} else {
					accept = mTime > lastModifTime ? true : false;
					System.out.println("File " + path.toString() + " modified "
							+ lastModifTime + " days ago, is " + mTime
							+ " greater ? "+accept);
				}
				return accept;
			} catch (IOException e) {
				e.printStackTrace();
				return false;
			}
		} else {
			return true;
		}
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		if (conf != null) {
			try {
				fs = FileSystem.get(conf);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

When executing the following job

hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10 

I get the following output

File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00000 modified 31 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00001 modified 20 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00002 modified 28 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00003 modified 3 days ago, is 10 lower ? false
13/07/29 14:59:26 INFO input.FileInputFormat: Total input paths to process : 3

By mixing both filtering logics and / or adding your own, you should be able to process any data based on file name, file properties, etc… as you wish !

hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10 -D file.pattern=.*HADOOP.*

This will process every files named —HADOOP— and modified less than 10 days ago.
This implementation is really helpful for me as I do not need to manually pre-filter my data set each time I want to execute my MapReduce code on a small subset.

Cheers!
Antoine

Hadoop: Implementing the Tool interface for MapReduce driver

Most of people usually create their MapReduce job using a driver code that is executed though its static main method. The downside of such implementation is that most of your specific configuration (if any) is usually hardcoded. Should you need to modify some of your configuration properties on the fly (such as changing the number of reducers), you would have to modify your code, rebuild your jar file and redeploy your application. This can be avoided by implementing the Tool interface in your MapReduce driver code.

Hadoop Configuration

By implementing the Tool interface and extending Configured class, you can easily set your hadoop Configuration object via the GenericOptionsParser, thus through the command line interface. This makes your code definitely more portable (and additionally slightly cleaner) as you do not need to hardcode any specific configuration anymore.

Let’s take a couple of example with and without the use of Tool interface.

Without Tool interface


public class ToolMapReduce {

	public static void main(String[] args) throws Exception {

		// Create configuration
		Configuration conf = new Configuration();

		// Create job
		Job job = new Job(conf, "Tool Job");
		job.setJarByClass(ToolMapReduce.class);

		// Setup MapReduce job
		job.setMapperClass(Mapper.class);
		job.setReducerClass(Reducer.class);

		// Set only 1 reduce task
		job.setNumReduceTasks(1);

		// Specify key / value
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);

		// Input
		FileInputFormat.addInputPath(job, new Path(args[0]));
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setOutputFormatClass(TextOutputFormat.class);

		// Execute job
		int code = job.waitForCompletion(true) ? 0 : 1;
		System.exit(code);
	}
}

Your MapReduce job will be executed as follows. You expect only 2 arguments here, inputPath and outputPath, located at respectively index [0] and [1] on your main method String array.

hadoop jar /path/to/My/jar.jar com.wordpress.hadoopi.ToolMapReduce /input/path /output/path

In that case, the number of reducers (1) is hardcoded (line #17) and therefore cannot be modified on demand.

With Tool interface

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ToolMapReduce extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
		System.exit(res);
	}

	@Override
	public int run(String[] args) throws Exception {

		// When implementing tool
		Configuration conf = this.getConf();

		// Create job
		Job job = new Job(conf, "Tool Job");
		job.setJarByClass(ToolMapReduce.class);

		// Setup MapReduce job
		// Do not specify the number of Reducer
		job.setMapperClass(Mapper.class);
		job.setReducerClass(Reducer.class);

		// Specify key / value
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);

		// Input
		FileInputFormat.addInputPath(job, new Path(args[0]));
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setOutputFormatClass(TextOutputFormat.class);

		// Execute job and return status
		return job.waitForCompletion(true) ? 0 : 1;
	}
}

ToolsRunner execute your MapReduce job through its static run method.
In this example we do not need to hardcode the number of reducers anymore as it can be specified directly from the CLI (using the “-D” option).

hadoop jar /path/to/My/jar.jar com.wordpress.hadoopi.ToolMapReduce -D mapred.reduce.tasks=1 /input/path /output/path

Note that you still have to supply inputPath and outputPath arguments. Basically GenericOptionParser will separate the generic Tools options from the actual job’s arguments. Whatever the number of generic options you might supply, inputPath and outputPath variables will be still located at index [0] and [1], but in your run method String array (not in your main method).

This -D option can be used for any “official” or custom property values.

conf.set("my.dummy.configuration","foobar");

becomes now…

-D my.dummy.configuration=foobar

HDFS and JobTracker properties

When I need to submit a jar file remotely to a distant hadoop server, I need to specify the below properties in my driver code.

Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "myserver.com:8021");
conf.set("fs.default.name", "hdfs://myserver.com:8020");

Using Tool interface, this is now out of the box as you can supply both -fs and -jt options from the CLI.

hadoop jar myjar.jar com.wordpress.hadoopi.ToolMapReduce -fs hdfs://myserver.com:8020 -jt myserver.com:8021

Thanks to this Tool implementation, my jar file is now 100% portable, and can be executed both locally or remotely without having to hardcode any specific value.

Generic options supported

Some additional useful options can be supplied from CLI.

-conf specify an application configuration file
-D use value for given property
-fs specify a namenode
-jt specify a job tracker
-files specify comma separated files to be copied to the map reduce cluster
-libjars specify comma separated jar files to include in the classpath.
-archives specify comma separated archives to be unarchived on the compute machines.

Should you need to add a specific library, archive file, etc… this Tool interface might be quite useful.
As you can see, it is maybe worth to implement this Tool interface in your driver code as it brings added value without any additional complexity.

Cheers!

Hadoop: Custom RecordReader – Processing String / Pattern delimited records

Now that both InputFormat and RecordReader are familiar concepts for you (if not, you can still refer to article Hadoop RecordReader and FileInputFormat), it is time to enter into the heart of the subject.

The default implementation of TextInputFormat is based on a Line-by-Line approach. Each line found in data set will be supplied to MapReduce framework as a set of key / value. Should you need to handle more than 1 line at a time, you can quite easily implement your own NLinesRecordReader (refer to this good article – bigdatacircus), but…

  • What if all your records do not have the same number of lines ?
  • How to process Record-by-Record instead of Line-by-Line ?

Should you need to process your data set based on a Record-by-Record approach, distinct records must be obviously separated by a common delimiter. This delimiter could be either a line (common String) or a common pattern.

String delimited records

Data set example

Take the below example of a (dummy) data set where all your records are separated by a same String.

----------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 
----------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 
----------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Implementation

In that case (records are always separated by a same “10-dash” String), the implementation is somehow out of the box. Indeed, default LineReader can take as an argument a recordDelimiterBytes byte array that can be retrieved / set directly from the Hadoop configuration. This parameter will be used as a String delimiter to separate distinct records.

Just make sure to set it up in your MapReduce driver code

Configuration conf = new Configuration(true);
conf.set("textinputformat.record.delimiter","------------");

…and to specify the default TextInputFormat for your MapReduce job’s InputFormat.

Job job = new Job(conf);
job.setInputFormat(TextInputFormat.class);

Instead of processing 1 given line at a time, you should be able to process a full NLines record. Will be supplied to your mappers instances the following keys / values:

  • Key is the offset (location of your record’s first line)
  • Value is the record itself

Note that the default delimiter is CRLF (additionally CR) character. Using the Hadoop default configuration, LineReader can be seen as a Record-by-Record reader that uses a CRLF delimiter, thus equivalent to a Line-by-Line reader actually.

Important update

Contrary to what is stated there on JIRA, custom delimiter (provided by “textinputformat.record.delimiter” parameter) is not supported on version 1.2.1 of Hadoop. However, you can still create your own record reader to handle that particular case. Have a look on my github account (hadoop-recordreader). See Delimiter.java that uses CustomFileInputFormat.java

Pattern delimited records

Data set example

Take now the following data set structure. Records are not separated by a common String anymore, but rather by a common pattern (DateTime). A String cannot be used here, so you will have to create your own RecordReader that splits records using a Regular Expression.

Sat, 25 May 2013 22:29:30
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

Sat, 25 May 2013 22:30:30
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

Sat, 25 May 2013 22:31:30
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

PatternRecordReader

The first thing to do here is to implement a custom reader that extends the default RecordReader and to implement its abstract methods. Should you need to get more details on how these methods work, please refer to my previous post (Hadoop RecordReader and FileInputFormat) as I will describe here only the delta compared to the default implementation.


public class PatternRecordReader
        extends RecordReader<LongWritable, Text> {

	private LineReader in;
	private final static Text EOL = new Text("\n");
	private Pattern delimiterPattern;
	private String delimiterRegex;
	private int maxLengthRecord;

	@Override
	public void initialize(InputSplit split,
                        TaskAttemptContext context)
			throws IOException, InterruptedException {

		Configuration job = context.getConfiguration();
		this.delimiterRegex = job.get("record.delimiter.regex");
		this.maxLengthRecord = job.getInt(
                                "mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);

		delimiterPattern = Pattern.compile(delimiterRegex);
		../..
	}

	private int readNext(Text text,
                        int maxLineLength,
                        int maxBytesToConsume)
			throws IOException {

		int offset = 0;
		text.clear();
		Text tmp = new Text();

		for (int i = 0; i < maxBytesToConsume; i++) {

			int offsetTmp = in.readLine(
                                     tmp,
                                     maxLineLength,
                                     maxBytesToConsume);
			offset += offsetTmp;
			Matcher m = delimiterPattern.matcher(tmp.toString());

			// End of File
			if (offsetTmp == 0) {
				break;
			}

			if (m.matches()) {
				// Record delimiter
				break;
			} else {
				// Append value to record
				text.append(EOL.getBytes(), 0, EOL.getLength());
				text.append(tmp.getBytes(), 0, tmp.getLength());
			}
		}
		return offset;
	}
}

Note the following points that differs from default implementation:

  • line 16: Retrieve regular expression from Hadoop configuration
  • line 21: Compile this regular expression only once per InputSplit

The actual logic is located in the readNext private method:
We simply get into a loop (limited by default with Integer.MAX_VALUE value) and append every line found together with a EOL character into a final Text() until current line matches our Regular Expression delimiter. We finally return the number of bytes we have read.

In the default implementation we were reading lines by using

            newSize = in.readLine(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

it becomes now

            newSize = readNext(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

PatternInputFormat

Next step is to create a custom InputFormat


public class PatternInputFormat
        extends FileInputFormat<LongWritable,Text>{

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split,
			TaskAttemptContext context)
                           throws IOException,
			          InterruptedException {

		return new PatternRecordReader();
	}

}

Driver code

In your driver code you need to provide Hadoop framework with the regular expression you have chosen


// regex matching pattern "Sat, 25 May 2013"
String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
Configuration conf = new Configuration(true);
conf.set("record.delimiter.regex", regex);

and to use this new InputFormat


Job job = new Job(conf);
job.setInputFormatClass(PatternInputFormat.class);

Mapper

I’m doing here a simple Map-only job in order to make sure all my records have been correctly separated

	public static class RecordMapper extends
			Mapper<LongWritable, Text, Text, NullWritable> {

		private Text out = new Text();

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			out.set(key + " -------------\n" + value);
			context.write(out, NullWritable.get());
		}
	}

Given the same data set as before, the Map-only job’s output is the following

10000 -------------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

13221 -------------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

15224 -------------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Conclusion

You are now able to process a “pseudo-unstructured” data set by reading Record-by-Record instead of Line-by-Line. This implementation might be really helpful if you need to convert rough log files into a more readable format (e.g. CSV). Instead of getting an external script that pre-process your data (e.g. Perl script) before uploading them on HDFS, you can take full benefit of the distributing computing, parsing your data set using the MapReduce framework.

I hope this article was interesting. Don’t hesitate to let me know if you have any questions.
Cheers,

Hadoop: WordCount with Custom Record Reader of TextInputFormat

Tutorials for Data Science , Machine Learning, AI & Big Data

In this hadoop tutorial we will have a look at the modification to our previous program wordcount with our own custom mapper and reducer by implementing a concept called as custom record reader. Before we attack the problem let us look at some theory required to understand the topic.

View original post 980 more words

Hadoop: RecordReader and FileInputFormat

Today’s new challenge…
I want to create a custom MapReduce job that can handle more than 1 single line at a time. Actually, it took me some time to understand the implementation of default LineRecordReader class, not because of its implementation Vs. my Java skill set, but rather that I was not familiar with its concept. I am describing in this article my understanding on this implementation.

As InputSplit is nothing more than a chunk of 1 or several blocks, it should be pretty rare to get a block boundary ending up at the exact location of a end of line (EOL). Some of my records located around block boundaries should be therefore split in 2 different blocks. This triggers the following issues:

  1. How Hadoop can guarantee lines read are 100% complete ?
  2. How Hadoop can consolidate a line that is starting on block B and that ends up on B+1 ?
  3. How Hadoop can guarantee we do not miss any line ?
  4. Is there a limitation in term of line’s size ? Can a line be greater than a block (i.e. spanned over more than 2 blocks) ? If so, is there any consequence in term of MapReduce performance ?

Definitions

InputFormat

Definition taken from

Hadoop relies on the input format of the job to do three things:
1. Validate the input configuration for the job (i.e., checking that the data is there).
2. Split the input blocks and files into logical chunks of type InputSplit, each of which is assigned to a map task for processing.
3. Create the RecordReader implementation to be used to create key/value pairs from the raw InputSplit. These pairs are sent one by one to their mapper.

RecordReader

Definition taken from

A RecordReader uses the data within the boundaries created by the input split to generate key/value pairs. In the context of file-based input, the “start” is the byte position in the file where the RecordReader should start generating key/value pairs. The “end” is where it should stop reading records. These are not hard boundaries as far as the API is concerned—there is nothing stopping a developer from reading the entire file for each map task. While reading the entire file is not advised, reading outside of the boundaries it often necessary to ensure that a complete record is generated

Example

I jumped right into the code of LineRecordReader and found it not that obvious to understand. Let’s get an example first that will hopefully make the code slightly more readable.
Suppose my data set is composed on a single 300Mb file, spanned over 3 different blocks (blocks of 128Mb), and suppose that I have been able to get 1 InputSplit for each block. Let’s imagine now 3 different scenarios.

File is composed on 6 lines of 50Mb each

InputSplit1

  • The first Reader will start reading bytes from Block B1, position 0. The first two EOL will be met at respectively 50Mb and 100Mb. 2 lines (L1 & L2) will be read and sent as key / value pairs to Mapper 1 instance. Then, starting from byte 100Mb, we will reach end of our Split (128Mb) before having found the third EOL. This incomplete line will be completed by reading the bytes in Block B2 until position 150Mb. First part of Line L3 will be read locally from Block B1, second part will be read remotely from Block B2 (by the mean of FSDataInputStream), and a complete record will be finally sent as key / value to Mapper 1.
  • The second Reader starts on Block B2, at position 128Mb. Because 128Mb is not the start of a file, there are strong chance our pointer is located somewhere in an existing record that has been already processed by previous Reader. We need to skip this record by jumping out to the next available EOL, found at position 150Mb. Actual start of RecordReader 2 will be at 150Mb instead of 128Mb.

We can wonder what happens in case a block starts exactly on a EOL. By jumping out until the next available record (through readLine method), we might miss 1 record. Before jumping to next EOL, we actually need to decrement initial “start” value to “start – 1”. Being located at at least 1 offset before EOL, we ensure no record is skipped !

Remaining process is following same logic, and everything is summarized in below table.

InputSplit_meta1

File composed on 2 lines of 150Mb each

InputSplit2

Same process as before:

  • Reader 1 will start reading from block B1, position 0. It will read line L1 locally until end of its split (128Mb), and will then continue reading remotely on B2 until EOL (150Mb)
  • Reader 2 will not start reading from 128Mb, but from 150Mb, and until B3:300

InputSplit_meta2

File composed on 2 lines of 300Mb each

OK, this one is a tricky and perhaps unrealistic example, but I was wondering what happens in case a record is larger than 2 blocks (spanned over at least 3 blocks).

InputSplit5

  • Reader 1 will start reading locally from B1:0 until B1:128, then remotely all bytes available on B2, and finally remotely on B3 until EOL is reached (300Mb). There is here some overhead as we’re trying to read a lot of data that is not locally available
  • Reader 2 will start reading from B2:128 and will jump out to next available record located at B3:300. Its new start position (B3:300) is actually greater than its maximum position (B2:256). This reader will therefore not provide Mapper 2 with any key / value. I understand it somehow as a kind of security feature ensuring data locality (that makes Hadoop so efficient in data processing) is preserved (i.e. Do not process a line that is not starting in the chunk I’m responsible for).
  • Reader 3 will start reading from B3:300 to B5:600

This is summarized in below table

InputSplit_meta5

Maximum size for a single record

There is a maximum size allowed for a single record to be processed. This value can be set using below parameter.

conf.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

A line with a size greater than this maximum value (default is 2,147,483,647) will be ignored.

I hope these 3 examples gives you a high level understanding on RecordReader and InputFormat. If so, let’s jump to the code, else, let me know.

I doubt a single record is hundreds of Mb large (300Mb in my example) in a real environment… With hundreds of Kb for a single record, the overhead due to a line spanning over different blocks should not be that significant, and overall performance should not be really affected

Implementation

RecordReader

I added some (a tons of) comments in the code in order to point out what has been previously said in the example section. Hopefully this makes it slightly clearer. A new Reader must extends class RecordReader and override several methods.


public class CustomLineRecordReader 
	extends RecordReader<LongWritable, Text> {

	private long start;
	private long pos;
	private long end;
	private LineReader in;
	private int maxLineLength;
	private LongWritable key = new LongWritable();
	private Text value = new Text();

	private static final Log LOG = LogFactory.getLog(
			CustomLineRecordReader.class);

	/**
	 * From Design Pattern, O'Reilly...
	 * This method takes as arguments the map task’s assigned InputSplit and
	 * TaskAttemptContext, and prepares the record reader. For file-based input
	 * formats, this is a good place to seek to the byte position in the file to
	 * begin reading.
	 */
	@Override
	public void initialize(
			InputSplit genericSplit, 
			TaskAttemptContext context)
			throws IOException {

		// This InputSplit is a FileInputSplit
		FileSplit split = (FileSplit) genericSplit;

		// Retrieve configuration, and Max allowed
		// bytes for a single record
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt(
				"mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);

		// Split "S" is responsible for all records
		// starting from "start" and "end" positions
		start = split.getStart();
		end = start + split.getLength();

		// Retrieve file containing Split "S"
		final Path file = split.getPath();
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());

		// If Split "S" starts at byte 0, first line will be processed
		// If Split "S" does not start at byte 0, first line has been already
		// processed by "S-1" and therefore needs to be silently ignored
		boolean skipFirstLine = false;
		if (start != 0) {
			skipFirstLine = true;
			// Set the file pointer at "start - 1" position.
			// This is to make sure we won't miss any line
			// It could happen if "start" is located on a EOL
			--start;
			fileIn.seek(start);
		}

		in = new LineReader(fileIn, job);

		// If first line needs to be skipped, read first line
		// and stores its content to a dummy Text
		if (skipFirstLine) {
			Text dummy = new Text();
			// Reset "start" to "start + line offset"
			start += in.readLine(dummy, 0,
					(int) Math.min(
							(long) Integer.MAX_VALUE, 
							end - start));
		}

		// Position is the actual start
		this.pos = start;

	}

	/**
	 * From Design Pattern, O'Reilly...
	 * Like the corresponding method of the InputFormat class, this reads a
	 * single key/ value pair and returns true until the data is consumed.
	 */
	@Override
	public boolean nextKeyValue() throws IOException {

		// Current offset is the key
		key.set(pos);

		int newSize = 0;

		// Make sure we get at least one record that starts in this Split
		while (pos < end) {

			// Read first line and store its content to "value"
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(
							Integer.MAX_VALUE, end - pos),
							maxLineLength));

			// No byte read, seems that we reached end of Split
			// Break and return false (no key / value)
			if (newSize == 0) {
				break;
			}

			// Line is read, new position is set
			pos += newSize;

			// Line is lower than Maximum record line size
			// break and return true (found key / value)
			if (newSize < maxLineLength) {
				break;
			}

			// Line is too long
			// Try again with position = position + line offset,
			// i.e. ignore line and go to next one
			// TODO: Shouldn't it be LOG.error instead ??
			LOG.info("Skipped line of size " + 
					newSize + " at pos "
					+ (pos - newSize));
		}

		
		if (newSize == 0) {
			// We've reached end of Split
			key = null;
			value = null;
			return false;
		} else {
			// Tell Hadoop a new line has been found
			// key / value will be retrieved by
			// getCurrentKey getCurrentValue methods
			return true;
		}
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This methods are used by the framework to give generated key/value pairs
	 * to an implementation of Mapper. Be sure to reuse the objects returned by
	 * these methods if at all possible!
	 */
	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {
		return key;
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This methods are used by the framework to give generated key/value pairs
	 * to an implementation of Mapper. Be sure to reuse the objects returned by
	 * these methods if at all possible!
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * Like the corresponding method of the InputFormat class, this is an
	 * optional method used by the framework for metrics gathering.
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This method is used by the framework for cleanup after there are no more
	 * key/value pairs to process.
	 */
	@Override
	public void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

}

FileInputFormat

Now that you have created a custom Reader, you need to use it from a class extending FileInputFormat, as reported below …


public class CustomFileInputFormat extends FileInputFormat<LongWritable,Text>{

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new CustomLineRecordReader();
	}
}

MapReduce

… and to use this new CustomFileInputFormat in your MapReduce driver code when specifying Input format.

.../...
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(CustomFileInputFormat.class);
.../...

Congratulations, if you followed this article you have just re-invented the wheel. We did not do anything more that re-implementing LineRecordReader and FileInputFormat, default implementations for Text file. However, I hope you now understand a bit better how these 2 classes works, allowing you to create your custom Reader and therefore being able to handle specific file format.

I hope you liked this article, that it was not too high-level and therefore not a waste of time..
Should you have any question / remarks / suggestions, feel free to comment. Feel also free to share it !

Cheers !

Hadoop: Setup Maven project for MapReduce in 5mn

I am sure I am not the only one who ever struggled with Hadoop eclipse plugin installation. This plugin strongly depends on your environment (eclipse, ant, jdk) and hadoop distribution and version. Moreover, it only provides the Old API for MapReduce.
It is so simple to create a maven project for Hadoop that wasting time trying to build this plugin becomes totally useless. I am describing on this article how to setup a first maven hadoop project for Cloudera CDH4 on eclipse.

Prerequisite

maven 3
jdk 1.6
eclipse with m2eclipse plugin installed

Add Cloudera repository

Cloudera jar files are not available on default Maven central repository. You need to explicitly add cloudera repo in your settings.xml (under ${HOME}/.m2/settings.xml).

<?xml version="1.0" encoding="UTF-8"?>
<settings>
    <profiles>
        <profile>
            <id>standard-extra-repos</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <repositories>
                <repository>
                    <!-- Central Repository -->
                    <id>central</id>
                    <url>http://repo1.maven.org/maven2/</url>
                    <releases>
                        <enabled>true</enabled>
                    </releases>
                    <snapshots>
                        <enabled>true</enabled>
                    </snapshots>
                </repository>
                <repository>
                    <!-- Cloudera Repository -->
                    <id>cloudera</id>
                    <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
                    <releases>
                        <enabled>true</enabled>
                    </releases>
                    <snapshots>
                        <enabled>true</enabled>
                   </snapshots>
                </repository>
            </repositories>
        </profile>
    </profiles>
</settings>

Create Maven project

On eclipse, create a new Maven project as follow

maven

maven2

maven3

Add Hadoop Nature

For Cloudera distribution CDH4, open pom.xml file and add the following dependencies


	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>jdk.tools</groupId>
				<artifactId>jdk.tools</artifactId>
				<version>1.6</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-hdfs</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-auth</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-common</artifactId>
				<version>2.0.0-cdh4.0.0</version>
			</dependency>
			<dependency>
				<groupId>org.apache.hadoop</groupId>
				<artifactId>hadoop-core</artifactId>
				<version>2.0.0-mr1-cdh4.0.1</version>
			</dependency>
			<dependency>
				<groupId>junit</groupId>
				<artifactId>junit-dep</artifactId>
				<version>4.8.2</version>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-auth</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.1</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

Download dependencies

Now that you have added your Cloudera repository and created your project, download dependencies. This can be easily done by right-clicking on your eclipse project, “update Maven dependencies”.
All these dependencies must have been added on your .m2 repository.

[developer@localhost ~]$ find .m2/repository/org/apache/hadoop -name "*.jar" 
.m2/repository/org/apache/hadoop/hadoop-tools/1.0.4/hadoop-tools-1.0.4.jar
.m2/repository/org/apache/hadoop/hadoop-common/2.0.0-cdh4.0.0/hadoop-common-2.0.0-cdh4.0.0-sources.jar
.m2/repository/org/apache/hadoop/hadoop-common/2.0.0-cdh4.0.0/hadoop-common-2.0.0-cdh4.0.0.jar
.m2/repository/org/apache/hadoop/hadoop-core/2.0.0-mr1-cdh4.0.1/hadoop-core-2.0.0-mr1-cdh4.0.1-sources.jar
.m2/repository/org/apache/hadoop/hadoop-core/2.0.0-mr1-cdh4.0.1/hadoop-core-2.0.0-mr1-cdh4.0.1.jar
.m2/repository/org/apache/hadoop/hadoop-hdfs/2.0.0-cdh4.0.0/hadoop-hdfs-2.0.0-cdh4.0.0.jar
.m2/repository/org/apache/hadoop/hadoop-streaming/1.0.4/hadoop-streaming-1.0.4.jar
.m2/repository/org/apache/hadoop/hadoop-auth/2.0.0-cdh4.0.0/hadoop-auth-2.0.0-cdh4.0.0.jar
[developer@localhost ~]$ 

Create WordCount example

Create your driver code

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {

		Path inputPath = new Path(args[0]);
		Path outputDir = new Path(args[1]);

		// Create configuration
		Configuration conf = new Configuration(true);

		// Create job
		Job job = new Job(conf, "WordCount");
		job.setJarByClass(WordCountMapper.class);

		// Setup MapReduce
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		job.setNumReduceTasks(1);

		// Specify key / value
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		// Input
		FileInputFormat.addInputPath(job, inputPath);
		job.setInputFormatClass(TextInputFormat.class);

		// Output
		FileOutputFormat.setOutputPath(job, outputDir);
		job.setOutputFormatClass(TextOutputFormat.class);

		// Delete output if exists
		FileSystem hdfs = FileSystem.get(conf);
		if (hdfs.exists(outputDir))
			hdfs.delete(outputDir, true);

		// Execute job
		int code = job.waitForCompletion(true) ? 0 : 1;
		System.exit(code);

	}

}

Create Mapper class

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends
		Mapper<Object, Text, Text, IntWritable> {

	private final IntWritable ONE = new IntWritable(1);
	private Text word = new Text();

	public void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {

		String[] csv = value.toString().split(",");
		for (String str : csv) {
			word.set(str);
			context.write(word, ONE);
		}
	}
}

Create your Reducer class

package com.aamend.hadoop.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {

	public void reduce(Text text, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}
		context.write(text, new IntWritable(sum));
	}
}

Build project

Exporting jar file is actually out of the box using maven. Execute the following command

mvn clean install

You should see same output as below

.../...

[INFO] 
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ MapReduce ---
[INFO] Building jar: /home/developer/Workspace/hadoop/MapReduce/target/MapReduce-0.0.1-SNAPSHOT.jar
[INFO] 
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ MapReduce ---
[INFO] Installing /home/developer/Workspace/hadoop/MapReduce/target/MapReduce-0.0.1-SNAPSHOT.jar to /home/developer/.m2/repository/com/aamend/hadoop/MapReduce/0.0.1-SNAPSHOT/MapReduce-0.0.1-SNAPSHOT.jar
[INFO] Installing /home/developer/Workspace/hadoop/MapReduce/pom.xml to /home/developer/.m2/repository/com/aamend/hadoop/MapReduce/0.0.1-SNAPSHOT/MapReduce-0.0.1-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.159s
[INFO] Finished at: Sat May 25 00:35:56 GMT+02:00 2013
[INFO] Final Memory: 16M/212M
[INFO] ------------------------------------------------------------------------

And your jar file must be available on project’s target directory (additionally in your ${HOME}/.m2 local repository).

maven5

This jar is ready to be executed on your Hadoop environment.

hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.aamend.hadoop.MapReduce.WordCount input output

Each time I need to create a new Hadoop project, I simply copy pom.xml template described above, and that’s it..

Hadoop: Indexing on MapReduce

Using the MapReduce framework, Yahoo! Inc. was able to sort 1Tb of data in less than 60 seconds (1Tb sort). Unfortunately not every companies should have the ability to build a thousand nodes cluster to achieve pseudo real-time querying. Assuming you have a 10 nodes cluster, which is already a good start for playing with big-data, sorting 1Tb of data should take roughly around 1h to complete. Given this constraint of limited nodes, it might become a need to repartition your data on HDFS and / or to implement indices in order to improve overall performances. I am describing in this report the implementation of custom indices based on Hadoop InputSplits that I have created, the issues I was facing, and finally what have been (if any) the added value compared to non-indexed MapReduce jobs.

indexing

Background

Reader should understand concept of Hadoop, HDFS and MapReduce, and should have experience in implementing basic Mappers and Reducers classes using Hadoop New API. Should you need to refresh your knowledge, the famous definitive guide from Tom White will be more than helpful.

InputSplit

Once you submit a MapReduce job, JobTracker will compute the number of Map tasks that need to be executed on your data set. This number depends on the number of files available in your HDFS, the file’s size and the block’s size. Any file is actually split into one or several InputSplit(s), which is basically a chunk of 1 or several block(s). From what I understand, each InputSplit will get its dedicated mapper instance, so the number of mappers should be actually directly related to the number of InputSplits.

An InputSplit is based on 3 different values:

  1. The file name
  2. offset (where InputSplit starts)
  3. length (where InputSplit stops)

The InputSplit method toString() will return below pattern:

hdfs://server.domain:8020/path/to/my/file:0+100000

By hashing this value using MD5Hash, you can get a unique ID identifying any InputSplit

911c058fbd1e60ee710dcc41fff16b27

Indexing

We can identify 2 different levels of granularity for creating indices: Index based on File URI or index based on InputSplit. Let’s take 2 different examples of data set.

index

In this first example, 2 files in your data set fit in 25 blocks, and have been identified as 7 different InputSplits. The target you are looking for (grey highlighted) is available on file #1 (block #2,#8 and #13), and on file #2 (block #17)

  • With File based indexing, you will end up with 2 files (full data set here), meaning that your indexed query will be equivalent to a full scan query
  • With InputSplit based indexing, you will end up with 4 InputSplits on 7 available. The performance should be definitely better than doing a full scan query

index

Let’s take a second example. This time the same data set has been sorted by the column you want to index. The target you are looking for (grey highlighted) is now available on file #1 (block #1,#2,#3 and #4).

  • With File based indexing, you will end up with only 1 file from your data set
  • With InputSplit based indexing, you will end up with 1 InputSplit on 7 available

For this specific study, I decided to use a custom InputSplit based index. I believe such approach should be quite a good balance between the efforts it takes to implement, the added value it might bring in term of performance optimization, and its expected applicability regardless to the data distribution.

Implementation

Process for implementing index is quite simple and consists on the following 3 steps:

  1. Build index from your full data set
  2. Get the InputSplit(s) for the indexed value you are looking for
  3. Execute your actual MapReduce job on indexed InputSplits only

The first step must be executed only once as long as the full data set does not change.

Build Index

Building a Mapreduce Index might take a really long time to execute as you are outputing each value to index together with its actual InputSplit location. The key parameter to estimate here is the number of Reducers to be used. Using a single reducer, all your indices will be written into a single file, but the time required to copy all data from mappers to a single reducer will be definitely too long. Using thousands of reducers, you will output indices on thousands of different files, but it might be significantly faster to execute. The right value is, I believe, a balance between the number of available reduce slots in your cluster and the expected size of your final index. In order to estimate it properly, I did a Simple Random Sampling (SRS) of 10% my data set. If you expect a 100Gb large index, you could set up to around 50 the number of reducers so that you will end up with 50 files of 2Gb each.

Mapper

Using Hadoop context, you can retrieve the current InputSplit from the running mapper instance. I am extracting the value I want to index, and output it together with its InputSplit MD5 hash.


public class RebuildIndexMapper
 extends Mapper<Object, Text, Text, Text> {

	private String splitId;

	public void setup(Context context) {
		// Retrieve current inputSplit from Context
		InputSplit is = context.getInputSplit();
		splitId = MD5Hash.digest(is.toString()).toString();
	}

	public void map(Object object, Text line, Context context) {
		// Retrieve key to index
		String key = Utils.getValueToIndex(line.toString());
		// Output value to index + current Split Id
		context.write(new Text(key), new Text(splitId));
	}
}

Assuming you are indexing IP addresses, intermediates key values will look like the following. From a 1st mapper,

192.168.0.1     60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     60390c7e429e38e8449519011a24f79d
192.168.0.1     60390c7e429e38e8449519011a24f79d

And from a second one,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4
   Combiner

As you are outputing key / values for each line of your data set, using a Combiner that removes any duplicate might be really useful. Implementation is quite obvious and will not be described here.

   Reducer

The goal of the reducer is simply to get the distinct InputSplit for any indexed value, and output them within 1 single line.


public class RebuildIndexReducer extends Reducer<Text, Text, Text, Text> {

	protected void reduce(Text key, Iterable values, Context context){
		// Remove duplicated SplitId for same target
		List<String> list = new ArrayList<String>();
		for (Text value : values) {
			String str = value.toString();
			if (!list.contains(str)) {
				list.add(str);
			}
		}
		// Concatenate distinct SplitId
		StringBuilder sb = new StringBuilder();
		for (String value : list) {
			sb.append(value+",");
		}
		context.write(key, new Text(sb.toString()));
	}
}

Given the same example as before, final output will look like the following

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

Because index output is usually quite large, using a SequenceOutputFile might be quite helpful.

Fetch Indexed values

Each time you need to execute a MapReduce job for a given indexed value (IP address in my example), you have first to query the index you have created at previous step in order to retrieve the distinct InputSplit this value belongs to.

Mapper

The map task is actually quite simple. For each indexed value that matches the target you are looking for, output all its indexed InputSplit.

public class FetchIndexMapper extends Mapper<Text, Text, Text, NullWritable> {

private String indexLookup;

	public void setup(Context context) {
		// Get the value to look up
		indexLookup = context.getConfiguration().get("target.lookup");
	}

	public void map(Text indexKey, Text indexValue, Context context) {
		String strKey = indexKey.toString();
		if (!strKey.equals(indexLookup)) {
			// Ignore record if it does not match target
			return;
		} else {
			for (String index : indexValue.toString().split(",")) {
				// Output each single InputSplit location
				context.write(new Text(index), NullWritable.get());
			}
		}
	}
}
Reducer

Purpose of Reduce task is simply to remove any duplicate. Implementation is quite obvious so it will not be described here. Given the same example as before,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

An index query for IP address 192.168.0.1 will output the following

ccc6decd3d361c3d651807a0c1a665e4  null
60390c7e429e38e8449519011a24f79d  null

These InputSplit MD5Hash should be written somewhere on HDFS temp folder so that it can be read in your actual MapReduce job (next section). If the file is quite large, getting a SequenceOutputFormat can be – once again – really helpful.

Execute query based on Indexed values

Now that we have built our index table and retrieved the actual InputSplit(s) for the target we are looking for, it is time to set up the actual MapReduce job.

Custom FileInputFormat

Using default configuration, Hadoop is able to retrieve the number of InputSplit to be used using the FileInputFormat.class. We will create our own FileInputFormat class extending the default one, and overriding its getSplits() method. You have to read the file you have created at previous step, add all your indexed InputSplits into a list, and then compare this list with the one returned by the super class. You will return to JobTracker only the InputSplits that were found in your index.

public class IndexFileInputFormat extends FileInputFormat<LongWritable, Text> {

	.../...

	@Override
	Public List getSplits(JobContext job) throws IOException {

		// Retrieve all default InputSplits
		List<InputSplit> totalIs = super.getSplits<InputSplit>(job);

		// Keep only the InputSplits matching our indexed InputSplits
		List<InputSplit> indexedIs = Utils.removeNonIndexedIS(totalIs);

		return indexedIs;
	}
}

With, in your Utils class

	public static List removeNonIndexedIS(List<InputSplit> totalIs){

		// Read your previous file and list all your indexed MD5
		List<String> md5Is = readInputSplitsFromFile();

		// Initialize new list of InputSplit
		List<InputSplit> indexedIs = new ArrayList<InputSplit>();

		// Filter out InputSplit that are not found in our indexed list
		for (InputSplit split : totalIs) {
			String str = MD5Hash.digest(split.toString()).toString();
			if (md5Is.contains(str)) {
				indexedIs.add(split);
			}
		}

		// Return list of input Split to query
		// Return empty list if target does not exist
		return indexedIs;
		}
	}
Driver code

We have now to use this IndexFileInputFormat class instead of the default one (FileInputFormat) in our driver code, and we’re in. During JobTracker initialization, Hadoop will use only the InputSplits that match the ones we have specified. You should end up with less map tasks than required for a “full scan query”.

	public void main(String[] args) {

		.../...

		// Create a new Job
		Job job = new Job(conf);
		job.setJarByClass(MyCustomMapper.class);
		job.setMapperClass(MyCustomMapper.class);
		job.setReducerClass(MyCustomReducer.class);

		// Use our custom IndexFileInputFormat
		job.setInputFormatClass(IndexFileInputFormat.class);

		.../...

	}

Testing

Test environment

For that testing purpose, I set up a small cluster of 3 virtual nodes (Virtualbox) as per below description. Hadoop cluster has been installed from Cloudera Manager 4.5.2 (free edition)

Host
OS: Mac OS X 10.7.5
Processor: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
Memory: 16Gb
1 Namenode + Jobtracker
OS: Ubuntu server 12.04.2 LTS
memory : 5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0
2 Datanode + Tasktracker
OS: Ubuntu  server 12.04.2 LTS
memory : 2.5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0

Test data

I have generated a fake data set using a simple perl script.  Even though we are far away from what a big-data environment is, I believe such a data set is large enough to see any potential improvement (if any) in using indices.

30Gb of fake data
100 bytes per records
1Gb per file

I have generated 5’000’000 different target (IP addresses) that I have randomly distributed over these 30’000’000 records. A same IP is found 60 times in average.

Results

I have done the exact same indexed and non-indexed jobs several time over subsets of 5, 10, 15, 20 and 25Gb of my dataset. Because my indexed values were evenly distributed, I expected most of my figures to follow somehow a linear trend.

I have represented in below graph the actual index size vs. dataset size. As expected, the index size is linearly growing up, and so is its rebuild execution time.

My index is around 7.5Gb large for a 25Gb dataset (30%), and has been fully rebuilt in around 30mn.

index

I have represented in below graph the execution time for both indexed and non-indexed jobs as function of the dataset size. This is obviously what everybody expect from indexing: Larger your dataset is, faster your indexed query will be (proportionally to the non-indexed one). Even with my tiny test environment, I have been able to see a great potential of using indices in order to improve MapReduce performance.

index_time

In previous graph I was not taking into account the time required for rebuilding my index. Even though this is done only once every X (hours, day, week, etc…), we have seen that this process is quite costly and might take a lot of time to complete. This must be taken into account when forecasting indexing performance. Assuming you are building your index only once a day, and then executing 10 MapReduce jobs, the overall execution time for all your indexed queries will be obviously

Time total = rebuild_time + 10 x (indexed_mapreduce_time)

A simple calculation on my experimental values shows me that I need to execute at least 5 MapReduce jobs a day to take any benefit of using custom indices on a 25Gb dataset.

Conclusion

MapReduce index performances are strongly dependent on your data distribution, and might be really powerful especially with large data set. Should you need to forecast how powerful this same implementation might be for your specific use case, I suggest you to benchmark on your production environment using a subset of your production dataset. For that purpose, you can perform several tests on a Simple Random Sampling so that you will be able to extrapolate these results to your entire data set.