Custom RecordReader – Processing String / Pattern delimited records

Now that both InputFormat and RecordReader are familiar concepts for you (if not, you can still refer to article Hadoop RecordReader and FileInputFormat), it is time to enter into the heart of the subject.

The default implementation of TextInputFormat is based on a Line-by-Line approach. Each line found in data set will be supplied to MapReduce framework as a set of key / value. Should you need to handle more than 1 line at a time, you can quite easily implement your own NLinesRecordReader (refer to this good article – bigdatacircus), but…

  • What if all your records do not have the same number of lines ?
  • How to process Record-by-Record instead of Line-by-Line ?

Should you need to process your data set based on a Record-by-Record approach, distinct records must be obviously separated by a common delimiter. This delimiter could be either a line (common String) or a common pattern.

String delimited records

Data set example

Take the below example of a (dummy) data set where all your records are separated by a same String.

----------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 
----------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 
----------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Implementation

In that case (records are always separated by a same “10-dash” String), the implementation is somehow out of the box. Indeed, default LineReader can take as an argument a recordDelimiterBytes byte array that can be retrieved / set directly from the Hadoop configuration. This parameter will be used as a String delimiter to separate distinct records.

Just make sure to set it up in your MapReduce driver code

Configuration conf = new Configuration(true);
conf.set("textinputformat.record.delimiter","------------");

…and to specify the default TextInputFormat for your MapReduce job’s InputFormat.

Job job = new Job(conf);
job.setInputFormat(TextInputFormat.class);

Instead of processing 1 given line at a time, you should be able to process a full NLines record. Will be supplied to your mappers instances the following keys / values:

  • Key is the offset (location of your record’s first line)
  • Value is the record itself

Note that the default delimiter is CRLF (additionally CR) character. Using the Hadoop default configuration, LineReader can be seen as a Record-by-Record reader that uses a CRLF delimiter, thus equivalent to a Line-by-Line reader actually.

Important update

Contrary to what is stated there on JIRA, custom delimiter (provided by “textinputformat.record.delimiter” parameter) is not supported on version 1.2.1 of Hadoop. However, you can still create your own record reader to handle that particular case. Have a look on my github account (hadoop-recordreader). See Delimiter.java that uses CustomFileInputFormat.java

Pattern delimited records

Data set example

Take now the following data set structure. Records are not separated by a common String anymore, but rather by a common pattern (DateTime). A String cannot be used here, so you will have to create your own RecordReader that splits records using a Regular Expression.

Sat, 25 May 2013 22:29:30
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

Sat, 25 May 2013 22:30:30
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

Sat, 25 May 2013 22:31:30
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

PatternRecordReader

The first thing to do here is to implement a custom reader that extends the default RecordReader and to implement its abstract methods. Should you need to get more details on how these methods work, please refer to my previous post (Hadoop RecordReader and FileInputFormat) as I will describe here only the delta compared to the default implementation.


public class PatternRecordReader
        extends RecordReader {

	private LineReader in;
	private final static Text EOL = new Text("\n");
	private Pattern delimiterPattern;
	private String delimiterRegex;
	private int maxLengthRecord;

	@Override
	public void initialize(InputSplit split,
                        TaskAttemptContext context)
			throws IOException, InterruptedException {

		Configuration job = context.getConfiguration();
		this.delimiterRegex = job.get("record.delimiter.regex");
		this.maxLengthRecord = job.getInt(
                                "mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);

		delimiterPattern = Pattern.compile(delimiterRegex);
		../..
	}

	private int readNext(Text text,
                        int maxLineLength,
                        int maxBytesToConsume)
			throws IOException {

		int offset = 0;
		text.clear();
		Text tmp = new Text();

		for (int i = 0; i < maxBytesToConsume; i++) {

			int offsetTmp = in.readLine(
                                     tmp,
                                     maxLineLength,
                                     maxBytesToConsume);
			offset += offsetTmp;
			Matcher m = delimiterPattern.matcher(tmp.toString());

			// End of File
			if (offsetTmp == 0) {
				break;
			}

			if (m.matches()) {
				// Record delimiter
				break;
			} else {
				// Append value to record
				text.append(EOL.getBytes(), 0, EOL.getLength());
				text.append(tmp.getBytes(), 0, tmp.getLength());
			}
		}
		return offset;
	}
}

Note the following points that differs from default implementation:

  • line 16: Retrieve regular expression from Hadoop configuration
  • line 21: Compile this regular expression only once per InputSplit

The actual logic is located in the readNext private method:
We simply get into a loop (limited by default with Integer.MAX_VALUE value) and append every line found together with a EOL character into a final Text() until current line matches our Regular Expression delimiter. We finally return the number of bytes we have read.

In the default implementation we were reading lines by using

            newSize = in.readLine(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

it becomes now

            newSize = readNext(value, maxLineLength,
                    Math.max((int) Math.min(
                            Integer.MAX_VALUE, end - pos),
                            maxLineLength));

PatternInputFormat

Next step is to create a custom InputFormat


public class PatternInputFormat
        extends FileInputFormat{

	@Override
	public RecordReader createRecordReader(
			InputSplit split,
			TaskAttemptContext context)
                           throws IOException,
			          InterruptedException {

		return new PatternRecordReader();
	}

}

Driver code

In your driver code you need to provide Hadoop framework with the regular expression you have chosen


// regex matching pattern "Sat, 25 May 2013"
String regex = "^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*";
Configuration conf = new Configuration(true);
conf.set("record.delimiter.regex", regex);

and to use this new InputFormat


Job job = new Job(conf);
job.setInputFormatClass(PatternInputFormat.class);

Mapper

I’m doing here a simple Map-only job in order to make sure all my records have been correctly separated

	public static class RecordMapper extends
			Mapper {

		private Text out = new Text();

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			out.set(key + " -------------\n" + value);
			context.write(out, NullWritable.get());
		}
	}

Given the same data set as before, the Map-only job’s output is the following

10000 -------------
pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu 
caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij 
merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple 
ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq 

13221 -------------
zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl 
Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu 
ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde 

15224 -------------
lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

Conclusion

You are now able to process a “pseudo-unstructured” data set by reading Record-by-Record instead of Line-by-Line. This implementation might be really helpful if you need to convert rough log files into a more readable format (e.g. CSV). Instead of getting an external script that pre-process your data (e.g. Perl script) before uploading them on HDFS, you can take full benefit of the distributing computing, parsing your data set using the MapReduce framework.

I hope this article was interesting. Don’t hesitate to let me know if you have any questions.
Cheers,

58 thoughts on “Custom RecordReader – Processing String / Pattern delimited records

    • Hi,
      I really liked your post! But am unable to find a way around for this issue.
      I have written the below code in the driver class as suggested.

      JobConf conf = new JobConf(WordCount.class);
      conf.set(“textinputformat.record.delimiter”,”\n\n”);

      But it’s still taking the usual “\n” delimiter. Should i create another class extending RecordReader or is there any other way??

      Please help me out here.

    • Hi,
      Thanks for the quick reply. I changed my code to use the new API. The driver code goes as below.

      public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration(true);
      Job job = new Job(conf);
      job.setJobName(“wordcount”);
      conf.set(“textinputformat.record.delimiter”,”\n\n”);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(“/home/abhishek_linuxhadoop/workspace/MR_WordCount/SampleDataSet.txt”));
      FileOutputFormat.setOutputPath(job, new Path(“/home/abhishek_linuxhadoop/workspace/MR_WordCount/OUTPUT_Word_Count”));

      job.setJarByClass(WordCount.class);
      job.submit();

      }

      The sample data set is separated by new lines character hence the \n\n usage.

      I am relatively new to the map reduce module.

      Can you pls help me out here as where am going wrong?

      The value in my mapper code, while debugging is still showing the first line and not the bunch of lines as it should. 😦

      • looks like you’re updating Hadoop configuration after creating your job. Try updating configuration as follows

        Configuration conf = new Configuration(true);
        conf.set(“textinputformat.record.delimiter”,”\n\n”);
        Job job = new Job(conf);
        job.setJobName(“wordcount”);

    • Tried that too..! No luck !

      Changed the code to look like this –

      public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration(true);
      conf.set(“textinputformat.record.delimiter”,”\n\n”);
      Job job = new Job(conf);
      job.setJobName(“wordcount”);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(“/home/abhishek_linuxhadoop/workspace/MR_WordCount/SampleDataSet.txt”));
      FileOutputFormat.setOutputPath(job, new Path(“/home/abhishek_linuxhadoop/workspace/MR_WordCount/OUTPUT_Word_Count”));

      job.setJarByClass(WordCount.class);
      job.submit();

      }

    • Yes the data is in the format of \r\n delimited.

      So should I replace the delimiter provided in the driver class by \r\n\r\n. That will work right?
      Or is there some issue with \r\n ?

    • It’s getting really embarrassing for me here.. ! 😦

      I simply copy pasted your code and ran it on my data.

      My data is of the following format –

      Id: 0
      ASIN: 0771044445
      discontinued product

      Id: 1
      ASIN: 0827229534
      title: Patterns of Preaching: A Sermon Sampler
      group: Book
      salesrank: 396585
      similar: 5 0804215715 156101074X 0687023955 0687074231 082721619X
      categories: 2
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]|Clergy[12360]|Preaching[12368]
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]|Clergy[12360]|Sermons[12370]
      reviews: total: 2 downloaded: 2 avg rating: 5
      2000-7-28 cutomer: A2JW67OY8U6HHK rating: 5 votes: 10 helpful: 9
      2003-12-14 cutomer: A2VE83MZF98ITY rating: 5 votes: 6 helpful: 5

      Id: 2
      ASIN: 0738700797
      title: Candlemas: Feast of Flames
      group: Book
      salesrank: 168596
      similar: 5 0738700827 1567184960 1567182836 0738700525 0738700940
      categories: 2
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
      reviews: total: 12 downloaded: 12 avg rating: 4.5
      2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4
      2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5
      2002-1-24 cutomer: A13SG9ACZ9O5IM rating: 5 votes: 8 helpful: 8
      2002-1-28 cutomer: A1BDAI6VEYMAZA rating: 5 votes: 4 helpful: 4
      2002-2-6 cutomer: A2P6KAWXJ16234 rating: 4 votes: 16 helpful: 16
      2002-2-14 cutomer: AMACWC3M7PQFR rating: 4 votes: 5 helpful: 5
      2002-3-23 cutomer: A3GO7UV9XX14D8 rating: 4 votes: 6 helpful: 6
      2002-5-23 cutomer: A1GIL64QK68WKL rating: 5 votes: 8 helpful: 8
      2003-2-25 cutomer: AEOBOF2ONQJWV rating: 5 votes: 8 helpful: 5
      2003-11-25 cutomer: A3IGHTES8ME05L rating: 5 votes: 5 helpful: 5
      2004-2-11 cutomer: A1CP26N8RHYVVO rating: 1 votes: 13 helpful: 9
      2005-2-7 cutomer: ANEIANH0WAT9D rating: 5 votes: 1 helpful: 1

      Id: 3
      ASIN: 0486287785
      title: World War II Allied Fighter Planes Trading Cards
      group: Book
      salesrank: 1270652
      similar: 0
      categories: 1
      |Books[283155]|Subjects[1000]|Home & Garden[48]|Crafts & Hobbies[5126]|General[5144]
      reviews: total: 1 downloaded: 1 avg rating: 5
      2003-7-10 cutomer: A3IDGASRQAW8B2 rating: 5 votes: 2 helpful: 2

      And after running your code it should have splitted the data on the basis of \n\n or \r\n\r\n i.e. from one Id occurrence to other but here’s the output that i’m getting –

      0 ******************
      Id: 0
      8 ******************
      ASIN: 0771044445
      25 ******************
      discontinued product
      48 ******************

      49 ******************
      Id: 1
      57 ******************
      ASIN: 0827229534
      74 ******************
      title: Patterns of Preaching: A Sermon Sampler
      123 ******************
      group: Book
      137 ******************
      salesrank: 396585
      157 ******************
      similar: 5 0804215715 156101074X 0687023955 0687074231 082721619X
      230 ******************
      categories: 2
      246 ******************
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]|Clergy[12360]|Preaching[12368]
      358 ******************
      |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Christianity[12290]|Clergy[12360]|Sermons[12370]
      468 ******************
      reviews: total: 2 downloaded: 2 avg rating: 5
      518 ******************
      2000-7-28 cutomer: A2JW67OY8U6HHK rating: 5 votes: 10 helpful: 9
      594 ******************
      2003-12-14 cutomer: A2VE83MZF98ITY rating: 5 votes: 6 helpful: 5
      671 ******************
      and so on..

      Really sorry for irritating you but am really stuck here.

      • Embarrassing for me as well as it still works on my side. Using CRLF format with delimiter=\r\n\r\n, and LF format with delimiter=\n\n
        Please send me a part of your actual input data

    • Yes this is exactly what i want!!! Damn, what the hell is this.. why am i not getting this output!

      I have mailed you the sample data set (part of actual only). Actual file is of 986 MB so am mailing you a very small part of it. Maybe there’s something with Hadoop jars version. I am going crazy now.

      I am using Hadoop-1.2.1. You??

      Please help me.

      • Contrary to what is stated there on JIRA, custom delimiter (provided by “textinputformat.record.delimiter” parameter) is not supported on version 1.2.1 of Hadoop. However, you can still create your own record reader to handle that particular case. Have a look on my github account (hadoop-recordreader). See Delimiter.java that uses CustomFileInputFormat.java

    • Hi Antoine,
      I have tried example code for patterrecordereader but output has come nothing. Please help me to get output.
      DriveR code:

      public int run(String[] args) throws Exception {
      // TODO Auto-generated method stub
      Configuration conf = getConf();

      Job patternJob = new Job(conf);
      patternJob.setJobName(“Pattern Job”);
      patternJob.setJarByClass(PatternDriver.class);
      patternJob.setMapperClass(PatternRecordMapper.class);

      patternJob.setNumReduceTasks(0);
      patternJob.setMapOutputKeyClass(Text.class);
      patternJob.setMapOutputValueClass(NullWritable.class);

      patternJob.setInputFormatClass(PatternInputFormat.class);
      FileInputFormat.setInputPaths(patternJob, new Path(args[0]));
      FileOutputFormat.setOutputPath(patternJob, new Path(args[1]));
      return patternJob.waitForCompletion(true) == true ? 0 : -1;

      }
      public static void main(String[] args) throws Exception {
      String regex = “^[A-Za-z]{3},\\s\\d{2}\\s[A-Za-z]{3}.*”;
      Configuration conf = new Configuration(true);
      conf.set(“record.delimiter.regex”, regex);
      ToolRunner.run(conf, new PatternDriver(), args);
      // regex matching pattern “Sat, 25 May 2013”

      }
      }

      OutPut:

      15/03/15 16:30:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
      15/03/15 16:30:50 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
      15/03/15 16:30:50 INFO input.FileInputFormat: Total input paths to process : 1
      15/03/15 16:30:50 INFO mapred.JobClient: Running job: job_local1825267119_0001
      15/03/15 16:30:50 INFO mapred.LocalJobRunner: Waiting for map tasks
      15/03/15 16:30:50 INFO mapred.LocalJobRunner: Starting task: attempt_local1825267119_0001_m_000000_0
      15/03/15 16:30:50 INFO util.ProcessTree: setsid exited with exit code 0
      15/03/15 16:30:50 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@d68713
      15/03/15 16:30:50 INFO mapred.MapTask: Processing split: file:/home/hadoop/work/input/patterninput.txt:0+758
      15/03/15 16:30:50 INFO mapred.Task: Task:attempt_local1825267119_0001_m_000000_0 is done. And is in the process of commiting
      15/03/15 16:30:50 INFO mapred.LocalJobRunner:
      15/03/15 16:30:50 INFO mapred.Task: Task attempt_local1825267119_0001_m_000000_0 is allowed to commit now
      15/03/15 16:30:50 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local1825267119_0001_m_000000_0’ to /home/hadoop/work/outputpattern8
      15/03/15 16:30:50 INFO mapred.LocalJobRunner:
      15/03/15 16:30:50 INFO mapred.Task: Task ‘attempt_local1825267119_0001_m_000000_0’ done.
      15/03/15 16:30:50 INFO mapred.LocalJobRunner: Finishing task: attempt_local1825267119_0001_m_000000_0
      15/03/15 16:30:50 INFO mapred.LocalJobRunner: Map task executor complete.
      15/03/15 16:30:51 INFO mapred.JobClient: map 0% reduce 0%
      15/03/15 16:30:51 INFO mapred.JobClient: Job complete: job_local1825267119_0001
      15/03/15 16:30:51 INFO mapred.JobClient: Counters: 12
      15/03/15 16:30:51 INFO mapred.JobClient: File Output Format Counters
      15/03/15 16:30:51 INFO mapred.JobClient: Bytes Written=8
      15/03/15 16:30:51 INFO mapred.JobClient: File Input Format Counters
      15/03/15 16:30:51 INFO mapred.JobClient: Bytes Read=0
      15/03/15 16:30:51 INFO mapred.JobClient: FileSystemCounters
      15/03/15 16:30:51 INFO mapred.JobClient: FILE_BYTES_READ=166
      15/03/15 16:30:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=50969
      15/03/15 16:30:51 INFO mapred.JobClient: Map-Reduce Framework
      15/03/15 16:30:51 INFO mapred.JobClient: Map input records=0
      15/03/15 16:30:51 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
      15/03/15 16:30:51 INFO mapred.JobClient: Spilled Records=0
      15/03/15 16:30:51 INFO mapred.JobClient: Total committed heap usage (bytes)=61079552
      15/03/15 16:30:51 INFO mapred.JobClient: CPU time spent (ms)=0
      15/03/15 16:30:51 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
      15/03/15 16:30:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=110
      15/03/15 16:30:51 INFO mapred.JobClient: Map output records=0

    • Hi,

      I too am having the same problem, the delimeter is not being accepted. I wanted the delimeter to be “CHAPTER” string. I have verified i am using the latest hadoop api as well. But it somehow does not work and i always get one line as input in mapper.

      Can you please help me?

  1. Hi,
    first of all: thank you very much for the helpful tutorial.
    But, due to the fact that the cluster of my university is set up with the old hadoop API, I am encountering the following problem:
    The (old interface) RecordReader has no initizialize method to be overwritten, so the custom splitting is not working.
    Do you have any ideas, how to initialize that method in the old API?
    I’ve been investing tons of time in that project – and now it’s just that ‘little issue’ preventing me from being successfull!
    Thanks in advance

    • Whatever the hadoop installation (unless this is pretty old), nothing should prevent you from implementing your job based on the new API.
      Anyway, since the only initialization you need is to retrieve params from Hadoop conf, why not creating your own constructor as below ?

      public PatternRecordReader(JobConf conf){ // Initialize }

      … And in PatternInputFormat, return new ParagraphRecordReader(conf)

      Let me know it it works !

  2. Hi. What if the delimiter contains important information that needs to be preserved in the map step? Do I need to to do some file seek operations to make sure the delimiter is not skipped?

  3. Hi! Thanks for this nice blog! Your explanations are nice, such as in the RecordReader and InputFormat post.

    I’m running this example, but only getting blank lines for the output. Eclipse is also grumbling that maxLengthRecord is not used, but that’s not major.

    For the blank lines, the Mapper will be calling getCurrentValue(), which will return the contents of ‘value’. But, I’m not understanding how the contents of ‘value’ is set. I see that the readNext() method appends a range of bytes to the ‘text’ variable it owns. In the default examples on here and the bigcircus post, ‘value’ is the variable that is appended to, with the range being established on the ‘v’ and ‘endline’ delimiters.

    I may be missing something, but, it seems like the reason I’m getting blank lines might be that the ‘value’ getting returned by getCurrentValue() is not updated? I’ve pasted the regex and sample data in and tested with it, with no good results coming out…if you can assist, I’d really appreciate it!

  4. Hi,
    thank you very much for the tutorial. But something goes wrong with my Output..
    like:
    0 ————-
    ————-
    14 ————-
    abcabc
    21 ————-
    abcabc
    28 ————-
    ————-
    42 ————-
    ski aier
    48 ————-
    ski aier
    …..

    the raw data are:

    ————-
    abcabc
    abcabc
    ————-
    ski aier
    ski aier

    is there something wrong with the delimiter?

  5. .. thanks, i have fixed the bugs 😛 in your second example.. can I have the date in my output value.. I mean, I just want to add the number for each data set in front of the data.

  6. your posts are very helpful, thanks.

    One question, though: At line 21 PatternRecordReader#initialize(), usually it follows the logic for skipping the already processed record from previous split, am I right ?

    Furthermore, in that case it’s not enough to start– and seek (just like the CustomLineRecordReader of yours) because we need to match a pattern and not newline. How does it works ? Would you read back until match that pattern to appropriately skip the record? Just brainstorming here ….

    Thanks again.

    • start– should skip the previous record indeed (although I did not see any official documentation).
      Dang ! You might be right, what happens when we’re located somewhere inside a String delimiter of length L ? We could read back L bytes, but using a Pattern Delimiter with an unknown length ?? Nice catch !

  7. Good day. I’ve been trying to implement the PatternRecordReader. Long story short, I need to get whole sentences as records from text files and, because they can be delimited by one of a bunch of symbols (such as full-stops, semi-colons and question marks), I cannot use the String Delimiter.

    However, I have yet to make it work. Even when supplying a single full-stop symbol as delimiter (like this, “\\.”), the output is the whole text in one sentence. Example:

    “In the year 1878 I took my degree of Doctor of Medicine of the University of London, and proceeded to Netley to go through the course prescribed for surgeons in the army. Having completed my studies there, I was duly attached to the Fifth Northumberland Fusiliers as Assistant Surgeon.”

    Should return both sentences separated by the full stop, but it doesn’t. Instead, it returns the whole text, making it even better when there’s a end of paragraph involved… \s

    • You know what? I think I got around it…

      I just had to modify the m.matches() part to modify the offset and append the text that I’m interested in. Now I just need to fix why a sentence in two lines is missing part of the text…

      • Final post, I hope.

        Solving the problem where I had part of the next sentence missing was easy, as I just had to hold on the leftovers of the line until the next iteration.

        Another setback I found was that I cannot process the case where multiple delimiters exist on the same line. That forces me to process the leftovers of the line before entering the while loop of readNext(), which gives me wrong offsets… But at least it does what I wanted it to do…

  8. Hello, I am new to Hadoop and trying to create a custim recordreader for my code which should read multiple lines (separated by \n) per mapper. I have tried running the Delimiter.java program from your github account. However it is using simple “\n” instead of “\n\n”. Can you please suggest? My data looks like:

    ———-
    pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu ploff pimsu
    caple supim pluff sumip qonaq issum daple ussum ronaq ossom fap25 abcde tonaq fghij
    merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumip qonaq issum daple
    ussum ronaq ossom faple abc75 tonaq fghij gaple klmno vonaq pqrst haple uvwxy nonaq

    ———-
    zzzzz laple pleff lorem monaq morel plaff sumip qonaq issum daple ussum ponaq gapl
    Klmno pm100 pleff lorem monaq morel plaff lerom baple merol pliff ipsum ponaq mipsu
    ploff pimsu caple supim pluff sumip qonaq issum daple ussum ronaq ossom fa125 abcde

    ———-
    lerom baple merol pliff ipsum ponaq mipsu ploff pimsu caple supim pluff sumih Qonaq

    • “textinputformat.record.delimiter” was not supported in version 1.2.1 in Hadoop. Hence the code published on my github account. If you’re using a newer distribution (e.g. CDH4), simply set “textinputformat.record.delimiter” in Hadoop conf. In any case, make sure your file is delimited accordingly (can be \n\n, \r\n\r\n, etc…).
      Last, according to your data set, why not delimiting with “———-” ?

  9. Hello,

    Hello, I am using a paragraph of input text and would like to set it up in a way that a mapper gets one sentence at a time. I coded the problem as per your suggestion. However I am still not getting each sentence as an output from map task. Here is my code:

    package aamend;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    /**
    * Created by antoine on 05/06/14.
    */
    public class Delimeter {

    public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    conf.set(“textinputformat.record.delimiter”, “delimiter”);
    Job job = new Job(conf, “Delimiter”);
    job.setJarByClass(Delimeter.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(DelimiterMapper.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public class TextInputFormat extends FileInputFormat {

    @Override
    public RecordReader
    createRecordReader(InputSplit split,
    TaskAttemptContext context) {

    // Hardcoding this value as “.”
    // You can add any delimiter as your requirement

    String delimiter = “.”;
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
    recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
    new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
    }
    }

    public static class DelimiterMapper extends Mapper {

    private final Text TEXT = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {

    // Should be a 2 lines key value
    TEXT.set(“{” + value.toString() + “}”);
    context.write(NullWritable.get(), TEXT);
    }
    }

    }

    And my output looks like:

    {Ice cream (derived from earlier iced cream or cream ice) is a frozen title=”Dessert” usually made from dairy products, such as milk and cream and often combined with fruits or other ingredients and flavours. Most varieties contain sugar, although some are made with other sweeteners. In some cases, artificial flavourings and colourings are used in addition to, or instead of, the natural ingredients. The mixture of chosen ingredients is stirred slowly while cooling, in order to incorporate air and to prevent large ice crystals from forming. The result is a smoothly textured semi-solid foam that is malleable and can be scooped.}

    • I’m afraid you’re using the default LineRecordReader in below snippet. Furthermore you’re not passing any “recordDelimiterBytes” as an argument..

      String delimiter = “.”;
      byte[] recordDelimiterBytes = null;
      if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
      return new LineRecordReader();
      }

  10. Hi Antoine, It seems a good approach. But unfortunately, writing java code like this is a tedious job for python programmers like me.. 😦 I am trying to use custom record reader for my python implementation. I am executing my code through hadoop streaming, is there anyway i can set this parameter — conf.set(“textinputformat.record.delimiter”,”————“) without writing code.? If not, how can i compile the above code into a jar and use? I dont think it will work if i just copy paste this. please help.

    • Writing a python script would be a bit tedious for me 🙂
      Anyway, if you’re using a recent hadoop version, textinputformat.record.delimiter is supported by default – No need to package / deploy any external jar. Just need to supply delimiter from the Hadoop Configuration.
      Using streaming.jar, you can specify additional configuration variables by using “-D =” , therefore “-D textinputformat.record.delimiter=——–“.

      Let me know how it works
      Cheers!

  11. Hi,

    Is it possible to have read cumulatively inside the mapper.

    For e.g.
    The input file contains:

    Line 1
    Line 2
    Line 3
    Line 4

    The mapper should get:
    Line 1, Line 2
    Line 2, Line 3
    Line 3, Line 4

    How shall I go about such a solution, any guidance will be helpful.

    Best,
    Anubhav

  12. Thanks a lot for posting these two articles.It really helped me a lot to understand how custom record reader works.

    I have requirement, where delimiter will present in the beginning of every line. (Not in a separate line as in the example mentioned above. So i need to consider that line as well.)

    Please find below examples
    1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog – ID47 [exampleSDID@32473iut=”3″ eventSource “Application” eventID=”1011″]
    1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog – ID48 [exampleSDID@334443iut=”6″ eventSource “app” eventID=”1012″]
    [timeQuality tzKnown=”0″ isSynced=”0″]
    1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog – ID47 [exampleSDID@32473iut=”3″ eventSource “Application” eventID=”1011″]

    ideally records should present in one line . But there some situations where records will present over multiple lines.

    So, Regular expression will consider 1 as the beginning of the line. If the line doesn’t began with 1 means it belongs to previous line and it should append to previous line.

    I used above code as reference to create custom record reader. If i used above implementation directly than first line of every record is skipping. i.e. if a record has one line , than it is not appearing in the result.
    to solve this, I put
    text.append(EOL.getBytes(), 0, EOL.getLength());
    text.append(tmp.getBytes(), 0, tmp.getLength());

    in side the if block then, only first line is appearing but if the record has more than one line than rest of the line is missing.

    If i put in both the places than second line is concatenating with next record.

    Any help is greatly appreciated.

    Regards,
    Vinayaka

  13. I have binary hdfs files where records have the following structure. The pattern “XYZ” begins every record followed by 4 bytes which (as an int) specify the size of the record. The file is therefore in the form XYZ[n1][n1 bytes]XYZ[n2][n2 bytes]…

    How would I go about handling this case with a custom record reader? One problem is I can’t make sure that the split doesn’t happen within the delimeter string “XYZ”.

    Any advice would be much appreciated.

  14. Hi Antoine

    Thanks for the detalied explaination.

    I have a few doubts –

    Could this function to set the customized delimeters be also used with the data which is not in text format i.e. for binary data, or for avro files etc.

    If not how to set the custom delimeter for non-textual data format.

    regards
    Aman

  15. Thanks a lot for your posting, I am new to Mapreduce, I am writing to ask could I use a regular expression delimiter when reading compressed files. could you give me a specific example, thanks a lot?
    Best Regards,
    Liuwu

  16. Hi,
    I really like your blogs and they have some very useful content for Hadoop developers.

    trying to build an inputformat which read 3 lines at a time
    i.e
    aaaaa
    bbbbb
    ccccc
    ddddd
    eeeee
    fffff

    it will send
    key value
    0 aaaaa bbbbb cccccc
    15 ddddd eeeee fffff

    on last record of first split it will merge some records from second split.
    In this scenarion how should I make sure that records which are read in first split from second split , will not be read by second i.e. no duplicacy

  17. This is such an awesome blog!!

    I am trying to implement a record reader comprising of 4 lines. Just had a doubt regarding the same. How will the record reader handle a scenario in which a record is split across blocks i.e 2 lines are present in one block and remaining lines of the same record are present in another block?

    Awaiting your reply.

  18. well, here you are using the date which is the start of new record to split records.
    But what if I want to process the date in each mapper.
    For example I want the output of the mapper be:
    date LastWordInTheRecord

Leave a comment