Hadoop: Custom RecordReader – Processing String / Pattern delimited records

Now that both InputFormat and RecordReader are familiar concepts for you (if not, you can still refer to article Hadoop RecordReader and FileInputFormat), it is time to enter into the heart of the subject.

The default implementation of TextInputFormat is based on a Line-by-Line approach. Each line found in data set will be supplied to MapReduce framework as a set of key / value. Should you need to handle more than 1 line at a time, you can quite easily implement your own NLinesRecordReader (refer to this good article – bigdatacircus), but…

  • What if all your records do not have the same number of lines ?
  • How to process Record-by-Record instead of Line-by-Line ?

Should you need to process your data set based on a Record-by-Record approach, distinct records must be obviously separated by a common delimiter. This delimiter could be either a line (common String) or a common pattern.

String delimited records

Data set example

Take the below example of a (dummy) data set where all your records are separated by a same String.

----------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 
----------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 
----------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Implementation

In that case (records are always separated by a same “10-dash” String), the implementation is somehow out of the box. Indeed, default LineReader can take as an argument a recordDelimiterBytes byte array that can be retrieved / set directly from the Hadoop configuration. This parameter will be used as a String delimiter to separate distinct records.

Just make sure to set it up in your MapReduce driver code

Configuration conf = new Configuration(true);
conf.set("textinputformat.record.delimiter","------------");

…and to specify the default TextInputFormat for your MapReduce job’s InputFormat.

Job job = new Job(conf);
job.setInputFormat(TextInputFormat.class);

Instead of processing 1 given line at a time, you should be able to process a full NLines record. Will be supplied to your mappers instances the following keys / values:

  • Key is the offset (location of your record’s first line)
  • Value is the record itself

Note that the default delimiter is CRLF (additionally CR) character. Using the Hadoop default configuration, LineReader can be seen as a Record-by-Record reader that uses a CRLF delimiter, thus equivalent to a Line-by-Line reader actually.

Important update

Contrary to what is stated there on JIRA, custom delimiter (provided by “textinputformat.record.delimiter” parameter) is not supported on version 1.2.1 of Hadoop. However, you can still create your own record reader to handle that particular case. Have a look on my github account (hadoop-recordreader). See Delimiter.java that uses CustomFileInputFormat.java

Pattern delimited records

Data set example

Take now the following data set structure. Records are not separated by a common String anymore, but rather by a common pattern (DateTime). A String cannot be used here, so you will have to create your own RecordReader that splits records using a Regular Expression.

Sat, 25 May 2013 22:29:30
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

Sat, 25 May 2013 22:30:30
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

Sat, 25 May 2013 22:31:30
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

PatternRecordReader

The first thing to do here is to implement a custom reader that extends the default RecordReader and to implement its abstract methods. Should you need to get more details on how these methods work, please refer to my previous post (Hadoop RecordReader and FileInputFormat) as I will describe here only the delta compared to the default implementation.


public class PatternRecordReader
        extends RecordReader<LongWritable, Text> {

	private LineReader in;
	private final static Text EOL = new Text("\n");
	private Pattern delimiterPattern;
	private String delimiterRegex;
	private int maxLengthRecord;

	@Override
	public void initialize(InputSplit split,
                        TaskAttemptContext context)
			throws IOException, InterruptedException {

		Configuration job = context.getConfiguration();
		this.delimiterRegex = job.get("record.delimiter.regex");
		this.maxLengthRecord = job.getInt(
                                "mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);

		delimiterPattern = Pattern.compile(delimiterRegex);
		../..
	}

	private int readNext(Text text,
                        int maxLineLength,
                        int maxBytesToConsume)
			throws IOException {

		int offset = 0;
		text.clear();
		Text tmp = new Text();

		for (int i = 0; i < maxBytesToConsume; i++) {

			int offsetTmp = in.readLine(
                                     tmp,
                                     maxLineLength,
                                     maxBytesToConsume);
			offset += offsetTmp;
			Matcher m = delimiterPattern.matcher(tmp.toString());

			// End of File
			if (offsetTmp == 0) {
				break;
			}

			if (m.matches()) {
				// Record delimiter
				break;
			} else {
				// Append value to record
				text.append(EOL.getBytes(), 0, EOL.getLength());
				text.append(tmp.getBytes(), 0, tmp.getLength());
			}
		}
		return offset;
	}
}

Note the following points that differs from default implementation:

  • line 16: Retrieve regular expression from Hadoop configuration
  • line 21: Compile this regular expression only once per InputSplit

The actual logic is located in the readNext private method:
We simply get into a loop (limited by default with Integer.MAX_VALUE value) and append every line found together with a EOL character into a final Text() until current line matches our Regular Expression delimiter. We finally return the number of bytes we have read.

In the default implementation we were reading lines by using

            newSize = in.readLine(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

it becomes now

            newSize = readNext(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

PatternInputFormat

Next step is to create a custom InputFormat


public class PatternInputFormat
        extends FileInputFormat<LongWritable,Text>{

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split,
			TaskAttemptContext context)
                           throws IOException,
			          InterruptedException {

		return new PatternRecordReader();
	}

}

Driver code

In your driver code you need to provide Hadoop framework with the regular expression you have chosen


// regex matching pattern "Sat, 25 May 2013"
String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
Configuration conf = new Configuration(true);
conf.set("record.delimiter.regex", regex);

and to use this new InputFormat


Job job = new Job(conf);
job.setInputFormatClass(PatternInputFormat.class);

Mapper

I’m doing here a simple Map-only job in order to make sure all my records have been correctly separated

	public static class RecordMapper extends
			Mapper<LongWritable, Text, Text, NullWritable> {

		private Text out = new Text();

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			out.set(key + " -------------\n" + value);
			context.write(out, NullWritable.get());
		}
	}

Given the same data set as before, the Map-only job’s output is the following

10000 -------------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

13221 -------------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

15224 -------------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Conclusion

You are now able to process a “pseudo-unstructured” data set by reading Record-by-Record instead of Line-by-Line. This implementation might be really helpful if you need to convert rough log files into a more readable format (e.g. CSV). Instead of getting an external script that pre-process your data (e.g. Perl script) before uploading them on HDFS, you can take full benefit of the distributing computing, parsing your data set using the MapReduce framework.

I hope this article was interesting. Don’t hesitate to let me know if you have any questions.
Cheers,

Hadoop: WordCount with Custom Record Reader of TextInputFormat

Tutorials for Data Science , Machine Learning, AI & Big Data

In this hadoop tutorial we will have a look at the modification to our previous program wordcount with our own custom mapper and reducer by implementing a concept called as custom record reader. Before we attack the problem let us look at some theory required to understand the topic.

View original post 980 more words

Hadoop: RecordReader and FileInputFormat

Today’s new challenge…
I want to create a custom MapReduce job that can handle more than 1 single line at a time. Actually, it took me some time to understand the implementation of default LineRecordReader class, not because of its implementation Vs. my Java skill set, but rather that I was not familiar with its concept. I am describing in this article my understanding on this implementation.

As InputSplit is nothing more than a chunk of 1 or several blocks, it should be pretty rare to get a block boundary ending up at the exact location of a end of line (EOL). Some of my records located around block boundaries should be therefore split in 2 different blocks. This triggers the following issues:

  1. How Hadoop can guarantee lines read are 100% complete ?
  2. How Hadoop can consolidate a line that is starting on block B and that ends up on B+1 ?
  3. How Hadoop can guarantee we do not miss any line ?
  4. Is there a limitation in term of line’s size ? Can a line be greater than a block (i.e. spanned over more than 2 blocks) ? If so, is there any consequence in term of MapReduce performance ?

Definitions

InputFormat

Definition taken from

Hadoop relies on the input format of the job to do three things:
1. Validate the input configuration for the job (i.e., checking that the data is there).
2. Split the input blocks and files into logical chunks of type InputSplit, each of which is assigned to a map task for processing.
3. Create the RecordReader implementation to be used to create key/value pairs from the raw InputSplit. These pairs are sent one by one to their mapper.

RecordReader

Definition taken from

A RecordReader uses the data within the boundaries created by the input split to generate key/value pairs. In the context of file-based input, the “start” is the byte position in the file where the RecordReader should start generating key/value pairs. The “end” is where it should stop reading records. These are not hard boundaries as far as the API is concerned—there is nothing stopping a developer from reading the entire file for each map task. While reading the entire file is not advised, reading outside of the boundaries it often necessary to ensure that a complete record is generated

Example

I jumped right into the code of LineRecordReader and found it not that obvious to understand. Let’s get an example first that will hopefully make the code slightly more readable.
Suppose my data set is composed on a single 300Mb file, spanned over 3 different blocks (blocks of 128Mb), and suppose that I have been able to get 1 InputSplit for each block. Let’s imagine now 3 different scenarios.

File is composed on 6 lines of 50Mb each

InputSplit1

  • The first Reader will start reading bytes from Block B1, position 0. The first two EOL will be met at respectively 50Mb and 100Mb. 2 lines (L1 & L2) will be read and sent as key / value pairs to Mapper 1 instance. Then, starting from byte 100Mb, we will reach end of our Split (128Mb) before having found the third EOL. This incomplete line will be completed by reading the bytes in Block B2 until position 150Mb. First part of Line L3 will be read locally from Block B1, second part will be read remotely from Block B2 (by the mean of FSDataInputStream), and a complete record will be finally sent as key / value to Mapper 1.
  • The second Reader starts on Block B2, at position 128Mb. Because 128Mb is not the start of a file, there are strong chance our pointer is located somewhere in an existing record that has been already processed by previous Reader. We need to skip this record by jumping out to the next available EOL, found at position 150Mb. Actual start of RecordReader 2 will be at 150Mb instead of 128Mb.

We can wonder what happens in case a block starts exactly on a EOL. By jumping out until the next available record (through readLine method), we might miss 1 record. Before jumping to next EOL, we actually need to decrement initial “start” value to “start – 1”. Being located at at least 1 offset before EOL, we ensure no record is skipped !

Remaining process is following same logic, and everything is summarized in below table.

InputSplit_meta1

File composed on 2 lines of 150Mb each

InputSplit2

Same process as before:

  • Reader 1 will start reading from block B1, position 0. It will read line L1 locally until end of its split (128Mb), and will then continue reading remotely on B2 until EOL (150Mb)
  • Reader 2 will not start reading from 128Mb, but from 150Mb, and until B3:300

InputSplit_meta2

File composed on 2 lines of 300Mb each

OK, this one is a tricky and perhaps unrealistic example, but I was wondering what happens in case a record is larger than 2 blocks (spanned over at least 3 blocks).

InputSplit5

  • Reader 1 will start reading locally from B1:0 until B1:128, then remotely all bytes available on B2, and finally remotely on B3 until EOL is reached (300Mb). There is here some overhead as we’re trying to read a lot of data that is not locally available
  • Reader 2 will start reading from B2:128 and will jump out to next available record located at B3:300. Its new start position (B3:300) is actually greater than its maximum position (B2:256). This reader will therefore not provide Mapper 2 with any key / value. I understand it somehow as a kind of security feature ensuring data locality (that makes Hadoop so efficient in data processing) is preserved (i.e. Do not process a line that is not starting in the chunk I’m responsible for).
  • Reader 3 will start reading from B3:300 to B5:600

This is summarized in below table

InputSplit_meta5

Maximum size for a single record

There is a maximum size allowed for a single record to be processed. This value can be set using below parameter.

conf.setInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

A line with a size greater than this maximum value (default is 2,147,483,647) will be ignored.

I hope these 3 examples gives you a high level understanding on RecordReader and InputFormat. If so, let’s jump to the code, else, let me know.

I doubt a single record is hundreds of Mb large (300Mb in my example) in a real environment… With hundreds of Kb for a single record, the overhead due to a line spanning over different blocks should not be that significant, and overall performance should not be really affected

Implementation

RecordReader

I added some (a tons of) comments in the code in order to point out what has been previously said in the example section. Hopefully this makes it slightly clearer. A new Reader must extends class RecordReader and override several methods.


public class CustomLineRecordReader 
	extends RecordReader<LongWritable, Text> {

	private long start;
	private long pos;
	private long end;
	private LineReader in;
	private int maxLineLength;
	private LongWritable key = new LongWritable();
	private Text value = new Text();

	private static final Log LOG = LogFactory.getLog(
			CustomLineRecordReader.class);

	/**
	 * From Design Pattern, O'Reilly...
	 * This method takes as arguments the map task’s assigned InputSplit and
	 * TaskAttemptContext, and prepares the record reader. For file-based input
	 * formats, this is a good place to seek to the byte position in the file to
	 * begin reading.
	 */
	@Override
	public void initialize(
			InputSplit genericSplit, 
			TaskAttemptContext context)
			throws IOException {

		// This InputSplit is a FileInputSplit
		FileSplit split = (FileSplit) genericSplit;

		// Retrieve configuration, and Max allowed
		// bytes for a single record
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt(
				"mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);

		// Split "S" is responsible for all records
		// starting from "start" and "end" positions
		start = split.getStart();
		end = start + split.getLength();

		// Retrieve file containing Split "S"
		final Path file = split.getPath();
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());

		// If Split "S" starts at byte 0, first line will be processed
		// If Split "S" does not start at byte 0, first line has been already
		// processed by "S-1" and therefore needs to be silently ignored
		boolean skipFirstLine = false;
		if (start != 0) {
			skipFirstLine = true;
			// Set the file pointer at "start - 1" position.
			// This is to make sure we won't miss any line
			// It could happen if "start" is located on a EOL
			--start;
			fileIn.seek(start);
		}

		in = new LineReader(fileIn, job);

		// If first line needs to be skipped, read first line
		// and stores its content to a dummy Text
		if (skipFirstLine) {
			Text dummy = new Text();
			// Reset "start" to "start + line offset"
			start += in.readLine(dummy, 0,
					(int) Math.min(
							(long) Integer.MAX_VALUE, 
							end - start));
		}

		// Position is the actual start
		this.pos = start;

	}

	/**
	 * From Design Pattern, O'Reilly...
	 * Like the corresponding method of the InputFormat class, this reads a
	 * single key/ value pair and returns true until the data is consumed.
	 */
	@Override
	public boolean nextKeyValue() throws IOException {

		// Current offset is the key
		key.set(pos);

		int newSize = 0;

		// Make sure we get at least one record that starts in this Split
		while (pos < end) {

			// Read first line and store its content to "value"
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(
							Integer.MAX_VALUE, end - pos),
							maxLineLength));

			// No byte read, seems that we reached end of Split
			// Break and return false (no key / value)
			if (newSize == 0) {
				break;
			}

			// Line is read, new position is set
			pos += newSize;

			// Line is lower than Maximum record line size
			// break and return true (found key / value)
			if (newSize < maxLineLength) {
				break;
			}

			// Line is too long
			// Try again with position = position + line offset,
			// i.e. ignore line and go to next one
			// TODO: Shouldn't it be LOG.error instead ??
			LOG.info("Skipped line of size " + 
					newSize + " at pos "
					+ (pos - newSize));
		}

		
		if (newSize == 0) {
			// We've reached end of Split
			key = null;
			value = null;
			return false;
		} else {
			// Tell Hadoop a new line has been found
			// key / value will be retrieved by
			// getCurrentKey getCurrentValue methods
			return true;
		}
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This methods are used by the framework to give generated key/value pairs
	 * to an implementation of Mapper. Be sure to reuse the objects returned by
	 * these methods if at all possible!
	 */
	@Override
	public LongWritable getCurrentKey() throws IOException,
			InterruptedException {
		return key;
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This methods are used by the framework to give generated key/value pairs
	 * to an implementation of Mapper. Be sure to reuse the objects returned by
	 * these methods if at all possible!
	 */
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * Like the corresponding method of the InputFormat class, this is an
	 * optional method used by the framework for metrics gathering.
	 */
	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float) (end - start));
		}
	}

	/**
	 * From Design Pattern, O'Reilly...
	 * This method is used by the framework for cleanup after there are no more
	 * key/value pairs to process.
	 */
	@Override
	public void close() throws IOException {
		if (in != null) {
			in.close();
		}
	}

}

FileInputFormat

Now that you have created a custom Reader, you need to use it from a class extending FileInputFormat, as reported below …


public class CustomFileInputFormat extends FileInputFormat<LongWritable,Text>{

	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new CustomLineRecordReader();
	}
}

MapReduce

… and to use this new CustomFileInputFormat in your MapReduce driver code when specifying Input format.

.../...
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(CustomFileInputFormat.class);
.../...

Congratulations, if you followed this article you have just re-invented the wheel. We did not do anything more that re-implementing LineRecordReader and FileInputFormat, default implementations for Text file. However, I hope you now understand a bit better how these 2 classes works, allowing you to create your custom Reader and therefore being able to handle specific file format.

I hope you liked this article, that it was not too high-level and therefore not a waste of time..
Should you have any question / remarks / suggestions, feel free to comment. Feel also free to share it !

Cheers !