Hadoop: Indexing on MapReduce

Using the MapReduce framework, Yahoo! Inc. was able to sort 1Tb of data in less than 60 seconds (1Tb sort). Unfortunately not every companies should have the ability to build a thousand nodes cluster to achieve pseudo real-time querying. Assuming you have a 10 nodes cluster, which is already a good start for playing with big-data, sorting 1Tb of data should take roughly around 1h to complete. Given this constraint of limited nodes, it might become a need to repartition your data on HDFS and / or to implement indices in order to improve overall performances. I am describing in this report the implementation of custom indices based on Hadoop InputSplits that I have created, the issues I was facing, and finally what have been (if any) the added value compared to non-indexed MapReduce jobs.

indexing

Background

Reader should understand concept of Hadoop, HDFS and MapReduce, and should have experience in implementing basic Mappers and Reducers classes using Hadoop New API. Should you need to refresh your knowledge, the famous definitive guide from Tom White will be more than helpful.

InputSplit

Once you submit a MapReduce job, JobTracker will compute the number of Map tasks that need to be executed on your data set. This number depends on the number of files available in your HDFS, the file’s size and the block’s size. Any file is actually split into one or several InputSplit(s), which is basically a chunk of 1 or several block(s). From what I understand, each InputSplit will get its dedicated mapper instance, so the number of mappers should be actually directly related to the number of InputSplits.

An InputSplit is based on 3 different values:

  1. The file name
  2. offset (where InputSplit starts)
  3. length (where InputSplit stops)

The InputSplit method toString() will return below pattern:

hdfs://server.domain:8020/path/to/my/file:0+100000

By hashing this value using MD5Hash, you can get a unique ID identifying any InputSplit

911c058fbd1e60ee710dcc41fff16b27

Indexing

We can identify 2 different levels of granularity for creating indices: Index based on File URI or index based on InputSplit. Let’s take 2 different examples of data set.

index

In this first example, 2 files in your data set fit in 25 blocks, and have been identified as 7 different InputSplits. The target you are looking for (grey highlighted) is available on file #1 (block #2,#8 and #13), and on file #2 (block #17)

  • With File based indexing, you will end up with 2 files (full data set here), meaning that your indexed query will be equivalent to a full scan query
  • With InputSplit based indexing, you will end up with 4 InputSplits on 7 available. The performance should be definitely better than doing a full scan query

index

Let’s take a second example. This time the same data set has been sorted by the column you want to index. The target you are looking for (grey highlighted) is now available on file #1 (block #1,#2,#3 and #4).

  • With File based indexing, you will end up with only 1 file from your data set
  • With InputSplit based indexing, you will end up with 1 InputSplit on 7 available

For this specific study, I decided to use a custom InputSplit based index. I believe such approach should be quite a good balance between the efforts it takes to implement, the added value it might bring in term of performance optimization, and its expected applicability regardless to the data distribution.

Implementation

Process for implementing index is quite simple and consists on the following 3 steps:

  1. Build index from your full data set
  2. Get the InputSplit(s) for the indexed value you are looking for
  3. Execute your actual MapReduce job on indexed InputSplits only

The first step must be executed only once as long as the full data set does not change.

Build Index

Building a Mapreduce Index might take a really long time to execute as you are outputing each value to index together with its actual InputSplit location. The key parameter to estimate here is the number of Reducers to be used. Using a single reducer, all your indices will be written into a single file, but the time required to copy all data from mappers to a single reducer will be definitely too long. Using thousands of reducers, you will output indices on thousands of different files, but it might be significantly faster to execute. The right value is, I believe, a balance between the number of available reduce slots in your cluster and the expected size of your final index. In order to estimate it properly, I did a Simple Random Sampling (SRS) of 10% my data set. If you expect a 100Gb large index, you could set up to around 50 the number of reducers so that you will end up with 50 files of 2Gb each.

Mapper

Using Hadoop context, you can retrieve the current InputSplit from the running mapper instance. I am extracting the value I want to index, and output it together with its InputSplit MD5 hash.


public class RebuildIndexMapper
 extends Mapper<Object, Text, Text, Text> {

	private String splitId;

	public void setup(Context context) {
		// Retrieve current inputSplit from Context
		InputSplit is = context.getInputSplit();
		splitId = MD5Hash.digest(is.toString()).toString();
	}

	public void map(Object object, Text line, Context context) {
		// Retrieve key to index
		String key = Utils.getValueToIndex(line.toString());
		// Output value to index + current Split Id
		context.write(new Text(key), new Text(splitId));
	}
}

Assuming you are indexing IP addresses, intermediates key values will look like the following. From a 1st mapper,

192.168.0.1     60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     60390c7e429e38e8449519011a24f79d
192.168.0.1     60390c7e429e38e8449519011a24f79d

And from a second one,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4
   Combiner

As you are outputing key / values for each line of your data set, using a Combiner that removes any duplicate might be really useful. Implementation is quite obvious and will not be described here.

   Reducer

The goal of the reducer is simply to get the distinct InputSplit for any indexed value, and output them within 1 single line.


public class RebuildIndexReducer extends Reducer<Text, Text, Text, Text> {

	protected void reduce(Text key, Iterable values, Context context){
		// Remove duplicated SplitId for same target
		List<String> list = new ArrayList<String>();
		for (Text value : values) {
			String str = value.toString();
			if (!list.contains(str)) {
				list.add(str);
			}
		}
		// Concatenate distinct SplitId
		StringBuilder sb = new StringBuilder();
		for (String value : list) {
			sb.append(value+",");
		}
		context.write(key, new Text(sb.toString()));
	}
}

Given the same example as before, final output will look like the following

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

Because index output is usually quite large, using a SequenceOutputFile might be quite helpful.

Fetch Indexed values

Each time you need to execute a MapReduce job for a given indexed value (IP address in my example), you have first to query the index you have created at previous step in order to retrieve the distinct InputSplit this value belongs to.

Mapper

The map task is actually quite simple. For each indexed value that matches the target you are looking for, output all its indexed InputSplit.

public class FetchIndexMapper extends Mapper<Text, Text, Text, NullWritable> {

private String indexLookup;

	public void setup(Context context) {
		// Get the value to look up
		indexLookup = context.getConfiguration().get("target.lookup");
	}

	public void map(Text indexKey, Text indexValue, Context context) {
		String strKey = indexKey.toString();
		if (!strKey.equals(indexLookup)) {
			// Ignore record if it does not match target
			return;
		} else {
			for (String index : indexValue.toString().split(",")) {
				// Output each single InputSplit location
				context.write(new Text(index), NullWritable.get());
			}
		}
	}
}
Reducer

Purpose of Reduce task is simply to remove any duplicate. Implementation is quite obvious so it will not be described here. Given the same example as before,

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

An index query for IP address 192.168.0.1 will output the following

ccc6decd3d361c3d651807a0c1a665e4  null
60390c7e429e38e8449519011a24f79d  null

These InputSplit MD5Hash should be written somewhere on HDFS temp folder so that it can be read in your actual MapReduce job (next section). If the file is quite large, getting a SequenceOutputFormat can be – once again – really helpful.

Execute query based on Indexed values

Now that we have built our index table and retrieved the actual InputSplit(s) for the target we are looking for, it is time to set up the actual MapReduce job.

Custom FileInputFormat

Using default configuration, Hadoop is able to retrieve the number of InputSplit to be used using the FileInputFormat.class. We will create our own FileInputFormat class extending the default one, and overriding its getSplits() method. You have to read the file you have created at previous step, add all your indexed InputSplits into a list, and then compare this list with the one returned by the super class. You will return to JobTracker only the InputSplits that were found in your index.

public class IndexFileInputFormat extends FileInputFormat<LongWritable, Text> {

	.../...

	@Override
	Public List getSplits(JobContext job) throws IOException {

		// Retrieve all default InputSplits
		List<InputSplit> totalIs = super.getSplits<InputSplit>(job);

		// Keep only the InputSplits matching our indexed InputSplits
		List<InputSplit> indexedIs = Utils.removeNonIndexedIS(totalIs);

		return indexedIs;
	}
}

With, in your Utils class

	public static List removeNonIndexedIS(List<InputSplit> totalIs){

		// Read your previous file and list all your indexed MD5
		List<String> md5Is = readInputSplitsFromFile();

		// Initialize new list of InputSplit
		List<InputSplit> indexedIs = new ArrayList<InputSplit>();

		// Filter out InputSplit that are not found in our indexed list
		for (InputSplit split : totalIs) {
			String str = MD5Hash.digest(split.toString()).toString();
			if (md5Is.contains(str)) {
				indexedIs.add(split);
			}
		}

		// Return list of input Split to query
		// Return empty list if target does not exist
		return indexedIs;
		}
	}
Driver code

We have now to use this IndexFileInputFormat class instead of the default one (FileInputFormat) in our driver code, and we’re in. During JobTracker initialization, Hadoop will use only the InputSplits that match the ones we have specified. You should end up with less map tasks than required for a “full scan query”.

	public void main(String[] args) {

		.../...

		// Create a new Job
		Job job = new Job(conf);
		job.setJarByClass(MyCustomMapper.class);
		job.setMapperClass(MyCustomMapper.class);
		job.setReducerClass(MyCustomReducer.class);

		// Use our custom IndexFileInputFormat
		job.setInputFormatClass(IndexFileInputFormat.class);

		.../...

	}

Testing

Test environment

For that testing purpose, I set up a small cluster of 3 virtual nodes (Virtualbox) as per below description. Hadoop cluster has been installed from Cloudera Manager 4.5.2 (free edition)

Host
OS: Mac OS X 10.7.5
Processor: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
Memory: 16Gb
1 Namenode + Jobtracker
OS: Ubuntu server 12.04.2 LTS
memory : 5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0
2 Datanode + Tasktracker
OS: Ubuntu  server 12.04.2 LTS
memory : 2.5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0

Test data

I have generated a fake data set using a simple perl script.  Even though we are far away from what a big-data environment is, I believe such a data set is large enough to see any potential improvement (if any) in using indices.

30Gb of fake data
100 bytes per records
1Gb per file

I have generated 5’000’000 different target (IP addresses) that I have randomly distributed over these 30’000’000 records. A same IP is found 60 times in average.

Results

I have done the exact same indexed and non-indexed jobs several time over subsets of 5, 10, 15, 20 and 25Gb of my dataset. Because my indexed values were evenly distributed, I expected most of my figures to follow somehow a linear trend.

I have represented in below graph the actual index size vs. dataset size. As expected, the index size is linearly growing up, and so is its rebuild execution time.

My index is around 7.5Gb large for a 25Gb dataset (30%), and has been fully rebuilt in around 30mn.

index

I have represented in below graph the execution time for both indexed and non-indexed jobs as function of the dataset size. This is obviously what everybody expect from indexing: Larger your dataset is, faster your indexed query will be (proportionally to the non-indexed one). Even with my tiny test environment, I have been able to see a great potential of using indices in order to improve MapReduce performance.

index_time

In previous graph I was not taking into account the time required for rebuilding my index. Even though this is done only once every X (hours, day, week, etc…), we have seen that this process is quite costly and might take a lot of time to complete. This must be taken into account when forecasting indexing performance. Assuming you are building your index only once a day, and then executing 10 MapReduce jobs, the overall execution time for all your indexed queries will be obviously

Time total = rebuild_time + 10 x (indexed_mapreduce_time)

A simple calculation on my experimental values shows me that I need to execute at least 5 MapReduce jobs a day to take any benefit of using custom indices on a 25Gb dataset.

Conclusion

MapReduce index performances are strongly dependent on your data distribution, and might be really powerful especially with large data set. Should you need to forecast how powerful this same implementation might be for your specific use case, I suggest you to benchmark on your production environment using a subset of your production dataset. For that purpose, you can perform several tests on a Simple Random Sampling so that you will be able to extrapolate these results to your entire data set.