Filter input files used for MapReduce

How to filter out the input files to process with MapReduce, and how to get a code versatile enough so that end user can easily select files based on a common pattern ?
Assuming your files stored on HDFS contain a date-time pattern, what if you could execute your code on “2013-07-XX” files only ? I am sure there are many ways to do so, but because of my Regexp addiction (thanks to my Perl legacy), I have created a MapReduce code that can take as an argument a regular expression in order to filter out any of the input files included in your data set.

how-we-filter

Filtering on file name

Command line interface

As reported in previous article, when implementing the Tool Interface, you can provide Hadoop with some configuration parameters directly from the command line interface (prefixed by -D option). By doing so, no need to deal with all these properties from static main(String[] args) method in your driver code anymore since all of them will be accessible system-wide through Hadoop configuration.

hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.*

Implementing PathFilter interface

PathFilter allows you to accept or reject any of the files included your input path, based on some custom restrictions (in our case a regular expression). This will be achieved by implementing the accept() method. Note that you’ll need to extends Configured class if you want to access “file.pattern” property supplied in CLI.

public class RegexFilter extends Configured implements PathFilter {

	Pattern pattern;
	Configuration conf;

	@Override
	public boolean accept(Path path) {
		Matcher m = pattern.matcher(path.toString());
		System.out.println("Is path : " + path.toString() + " matching "
			+ conf.get("file.pattern") + " ? , " + m.matches());
		return m.matches();
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		pattern = Pattern.compile(conf.get("file.pattern"));
	}
}

In setConf() method, we retrieve the hadoop configuration and compile our regular expression. In accept() method, we return true if file matches our pattern, false otherwise. Pretty obvious, isn’t it ?

Driver code

Using your RegexFilter in MapReduce driver code is pretty straightforward. You need to call setInputPathFilter static method and add your custom PathFilter implementation. Your filter will be applied on each file included in inputPath directory (supplied in addInputPath method).

	// Input
	FileInputFormat.setInputPathFilter(myJob, RegexFilter.class);
	FileInputFormat.addInputPath(myJob, inputPath);
	myJob.setInputFormatClass(TextInputFormat.class);

	// Output
	FileOutputFormat.setOutputPath(myJob, outputPath);
	myJob.setOutputFormatClass(TextOutputFormat.class);

Testing

First test done with a “.*” regular expression (i.e. all files)
Our implementation works

Is path : hdfs://hadub1:8020/user/hadoopi/data matching .* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .* ? , true
13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2
13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008

Second test done with a more restrictive pattern (“.*part.*”)
Our implementation does not work here !

Is path : hdfs://hadub1:8020/user/hadoopi/data matches .*part.* ? , false
13/07/29 10:45:09 INFO mapred.JobClient: Cleaning up the staging area hdfs://hadub1:8020/user/hadoopi/.staging/job_201307291018_0006
13/07/29 10:45:09 ERROR security.UserGroupInformation: PriviledgedActionException as:hadoopi (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)

Root cause is the following: the first call to the PathFilter is actually always the directory itself (/user/hadoopi/data in our case) and since it does not match our pattern, directory will be rejected and exception will be thrown.

PathFilter with directory support

In order to avoid this issue, the following modification must be done in our PathFilter implementation

public class RegexFilter extends Configured implements PathFilter {

	Pattern pattern;
	Configuration conf;
	FileSystem fs;

	@Override
	public boolean accept(Path path) {

		try {
			if (fs.isDirectory(path)) {
				return true;
			} else {
				Matcher m = pattern.matcher(path.toString());
				System.out.println("Is path : " + path.toString() + " matches "
						+ conf.get("file.pattern") + " ? , " + m.matches());
				return m.matches();
			}
		} catch (IOException e) {
			e.printStackTrace();
			return false;
		}

	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		if (conf != null) {
			try {
				fs = FileSystem.get(conf);
				pattern = Pattern.compile(conf.get("file.pattern"));
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}

Within setConf() method, we mount our HDFS. On accept() method, we first test whether the supplied path is a directory or not. If so, directory is accepted (return true), else, filename is tested against our regular expression.
Let’s restart our process

Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .*part.* ? , true
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .*part.* ? , true
13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2
13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008

Well, it works now, you should be able filter out any of your input files based on some regular expressions (do not forget to escape backslashes).

hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.*
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*2013-07-\\d{2}.*
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*part-r-0000[0-1].*

Great, our MapReduce code is now able to filter out any input files based on regular expression.

Bonus track : Filtering on file properties

Let’s filter out new files now, this time not on file’s name anymore, but rather on file properties (e.g. modification time). I want to process every files with a last modification time greater / lower than a supplied value, similarly to Unix commands below

find . -mtime +20 | xargs process

For that purpose I will use same logic as per RegexFilter explained above. If Path is a directory, return true, otherwise test file’s last modification date.
This should accept both negative and positive values. This value will be set as a configuration value (“file.mtime”) from CLI

  • mtime+20 : File(s) modified for more than 20 days
  • mtime-20 : File(s) modified for less than 20 days

The file’s modification time will be retrieved from fs.getFileStatus(path) and will be compared to supplied value and System.getCurrentTime().


public class FileFilter extends Configured implements PathFilter {

	Configuration conf;
	FileSystem fs;

	@Override
	public boolean accept(Path path) {
		try {
			if(fs.isDirectory(path)){
				return true;
			}
		} catch (IOException e1) {
			e1.printStackTrace();
			return false;
		}

		if (conf.get("file.mtime") != null) {
			int mTime = 0;
			String strMtime = conf.get("file.mtime");
			mTime = Integer.valueOf(strMtime.substring(1, strMtime.length()));
			try {
				FileStatus file = fs.getFileStatus(path);
				long now = System.currentTimeMillis() / (1000 * 3600 * 24);
				long time = file.getModificationTime() / (1000 * 3600 * 24);
				long lastModifTime = now - time;
				boolean accept;
				if (strMtime.charAt(0) == '-') {
					accept = mTime  lastModifTime ? true : false;
					System.out.println("File " + path.toString() + " modified "
							+ lastModifTime + " days ago, is " + mTime
							+ " greater ? "+accept);
				}
				return accept;
			} catch (IOException e) {
				e.printStackTrace();
				return false;
			}
		} else {
			return true;
		}
	}

	@Override
	public void setConf(Configuration conf) {
		this.conf = conf;
		if (conf != null) {
			try {
				fs = FileSystem.get(conf);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

When executing the following job

hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10

I get the following output

File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00000 modified 31 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00001 modified 20 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00002 modified 28 days ago, is 10 lower ? true
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00003 modified 3 days ago, is 10 lower ? false
13/07/29 14:59:26 INFO input.FileInputFormat: Total input paths to process : 3

By mixing both filtering logics and / or adding your own, you should be able to process any data based on file name, file properties, etc… as you wish !

hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10 -D file.pattern=.*HADOOP.*

This will process every files named —HADOOP— and modified less than 10 days ago.
This implementation is really helpful for me as I do not need to manually pre-filter my data set each time I want to execute my MapReduce code on a small subset.

Cheers!
Antoine

6 thoughts on “Filter input files used for MapReduce

  1. For information, I setup param “file.pattern” as optional. For that purpose, in driver code, a simple below line does the trick

    If (conf.get(“file.pattern”) == null)
    conf.set(“file.pattern”,”.*”); // Every files by default

  2. Every informative, I want to do the exact same thing in a UDF for pig so that I would be able to load filtered files from a directory .

    For example :
    A = LOAD myUdf(“path_of_directory”, “*.xml”);

    Appreciation in advance

  3. simpler solution would be
    in the CLI
    -D file.pattern=/user/$USER/$myPath/$myRegex
    and in your main
    TextInputFormat.setInputPaths(job, new Path(conf.get(“file.pattern”)));

Leave a comment