How to filter out the input files to process with MapReduce, and how to get a code versatile enough so that end user can easily select files based on a common pattern ?
Assuming your files stored on HDFS contain a date-time pattern, what if you could execute your code on “2013-07-XX” files only ? I am sure there are many ways to do so, but because of my Regexp addiction (thanks to my Perl legacy), I have created a MapReduce code that can take as an argument a regular expression in order to filter out any of the input files included in your data set.
Filtering on file name
Command line interface
As reported in previous article, when implementing the Tool Interface, you can provide Hadoop with some configuration parameters directly from the command line interface (prefixed by -D option). By doing so, no need to deal with all these properties from static main(String[] args) method in your driver code anymore since all of them will be accessible system-wide through Hadoop configuration.
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.*
Implementing PathFilter interface
PathFilter allows you to accept or reject any of the files included your input path, based on some custom restrictions (in our case a regular expression). This will be achieved by implementing the accept() method. Note that you’ll need to extends Configured class if you want to access “file.pattern” property supplied in CLI.
public class RegexFilter extends Configured implements PathFilter { Pattern pattern; Configuration conf; @Override public boolean accept(Path path) { Matcher m = pattern.matcher(path.toString()); System.out.println("Is path : " + path.toString() + " matching " + conf.get("file.pattern") + " ? , " + m.matches()); return m.matches(); } @Override public void setConf(Configuration conf) { this.conf = conf; pattern = Pattern.compile(conf.get("file.pattern")); } }
In setConf() method, we retrieve the hadoop configuration and compile our regular expression. In accept() method, we return true if file matches our pattern, false otherwise. Pretty obvious, isn’t it ?
Driver code
Using your RegexFilter in MapReduce driver code is pretty straightforward. You need to call setInputPathFilter static method and add your custom PathFilter implementation. Your filter will be applied on each file included in inputPath directory (supplied in addInputPath method).
// Input FileInputFormat.setInputPathFilter(myJob, RegexFilter.class); FileInputFormat.addInputPath(myJob, inputPath); myJob.setInputFormatClass(TextInputFormat.class); // Output FileOutputFormat.setOutputPath(myJob, outputPath); myJob.setOutputFormatClass(TextOutputFormat.class);
Testing
First test done with a “.*” regular expression (i.e. all files)
Our implementation works
Is path : hdfs://hadub1:8020/user/hadoopi/data matching .* ? , true Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .* ? , true Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .* ? , true 13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2 13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008
Second test done with a more restrictive pattern (“.*part.*”)
Our implementation does not work here !
Is path : hdfs://hadub1:8020/user/hadoopi/data matches .*part.* ? , false 13/07/29 10:45:09 INFO mapred.JobClient: Cleaning up the staging area hdfs://hadub1:8020/user/hadoopi/.staging/job_201307291018_0006 13/07/29 10:45:09 ERROR security.UserGroupInformation: PriviledgedActionException as:hadoopi (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hadub1:8020/user/hadoopi/data at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231)
Root cause is the following: the first call to the PathFilter is actually always the directory itself (/user/hadoopi/data in our case) and since it does not match our pattern, directory will be rejected and exception will be thrown.
PathFilter with directory support
In order to avoid this issue, the following modification must be done in our PathFilter implementation
public class RegexFilter extends Configured implements PathFilter { Pattern pattern; Configuration conf; FileSystem fs; @Override public boolean accept(Path path) { try { if (fs.isDirectory(path)) { return true; } else { Matcher m = pattern.matcher(path.toString()); System.out.println("Is path : " + path.toString() + " matches " + conf.get("file.pattern") + " ? , " + m.matches()); return m.matches(); } } catch (IOException e) { e.printStackTrace(); return false; } } @Override public void setConf(Configuration conf) { this.conf = conf; if (conf != null) { try { fs = FileSystem.get(conf); pattern = Pattern.compile(conf.get("file.pattern")); } catch (IOException e) { e.printStackTrace(); } } } }
Within setConf() method, we mount our HDFS. On accept() method, we first test whether the supplied path is a directory or not. If so, directory is accepted (return true), else, filename is tested against our regular expression.
Let’s restart our process
Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00000 matches .*part.* ? , true Is path : hdfs://hadub1:8020/user/hadoopi/data/hadoopi_part-r-00001 matches .*part.* ? , true 13/07/29 10:52:30 INFO input.FileInputFormat: Total input paths to process : 2 13/07/29 10:52:35 INFO mapred.JobClient: Running job: job_201307291018_0008
Well, it works now, you should be able filter out any of your input files based on some regular expressions (do not forget to escape backslashes).
hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*regex.* hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*2013-07-\\d{2}.* hadoop jar com.wordpress.hadoopi.Filter -D file.pattern=.*part-r-0000[0-1].*
Great, our MapReduce code is now able to filter out any input files based on regular expression.
Bonus track : Filtering on file properties
Let’s filter out new files now, this time not on file’s name anymore, but rather on file properties (e.g. modification time). I want to process every files with a last modification time greater / lower than a supplied value, similarly to Unix commands below
find . -mtime +20 | xargs process
For that purpose I will use same logic as per RegexFilter explained above. If Path is a directory, return true, otherwise test file’s last modification date.
This should accept both negative and positive values. This value will be set as a configuration value (“file.mtime”) from CLI
- mtime+20 : File(s) modified for more than 20 days
- mtime-20 : File(s) modified for less than 20 days
The file’s modification time will be retrieved from fs.getFileStatus(path) and will be compared to supplied value and System.getCurrentTime().
public class FileFilter extends Configured implements PathFilter { Configuration conf; FileSystem fs; @Override public boolean accept(Path path) { try { if(fs.isDirectory(path)){ return true; } } catch (IOException e1) { e1.printStackTrace(); return false; } if (conf.get("file.mtime") != null) { int mTime = 0; String strMtime = conf.get("file.mtime"); mTime = Integer.valueOf(strMtime.substring(1, strMtime.length())); try { FileStatus file = fs.getFileStatus(path); long now = System.currentTimeMillis() / (1000 * 3600 * 24); long time = file.getModificationTime() / (1000 * 3600 * 24); long lastModifTime = now - time; boolean accept; if (strMtime.charAt(0) == '-') { accept = mTime lastModifTime ? true : false; System.out.println("File " + path.toString() + " modified " + lastModifTime + " days ago, is " + mTime + " greater ? "+accept); } return accept; } catch (IOException e) { e.printStackTrace(); return false; } } else { return true; } } @Override public void setConf(Configuration conf) { this.conf = conf; if (conf != null) { try { fs = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } } }
When executing the following job
hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10
I get the following output
File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00000 modified 31 days ago, is 10 lower ? true File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00001 modified 20 days ago, is 10 lower ? true File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00002 modified 28 days ago, is 10 lower ? true File hdfs://hadub1:8020/user/antoine/data/hadoopi_part-r-00003 modified 3 days ago, is 10 lower ? false 13/07/29 14:59:26 INFO input.FileInputFormat: Total input paths to process : 3
By mixing both filtering logics and / or adding your own, you should be able to process any data based on file name, file properties, etc… as you wish !
hadoop jar com.wordpress.hadoopi.Filter -D file.mtime=-10 -D file.pattern=.*HADOOP.*
This will process every files named —HADOOP— and modified less than 10 days ago.
This implementation is really helpful for me as I do not need to manually pre-filter my data set each time I want to execute my MapReduce code on a small subset.
Cheers!
Antoine
For information, I setup param “file.pattern” as optional. For that purpose, in driver code, a simple below line does the trick
If (conf.get(“file.pattern”) == null)
conf.set(“file.pattern”,”.*”); // Every files by default
Every informative, I want to do the exact same thing in a UDF for pig so that I would be able to load filtered files from a directory .
For example :
A = LOAD myUdf(“path_of_directory”, “*.xml”);
Appreciation in advance
Well explained…. so well taken 🙂
simpler solution would be
in the CLI
-D file.pattern=/user/$USER/$myPath/$myRegex
and in your main
TextInputFormat.setInputPaths(job, new Path(conf.get(“file.pattern”)));
Reblogged this on HadoopEssentials.
Hi there
Can we please use your image (of the filter) on one of our library guides. It would only be used for educational purposes.
Thank You