package hadoop.util.data;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class DataSampler {
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc This function returns Path array based on the sampling percentage
* desired. For eg, for 450-way HDFS file if a 10% sample is required,
* this function returns a subset containing 45 paths.
* @return Path[]
*
*/
public static Path[] getSamplePaths(FileSystem fs, Path path,
int samplePercentage) throws IOException {
Path[] paths = listPaths(fs, path);
return getSamplePathsHelper(paths, samplePercentage);
}
public static Path[] getSamplePaths(String uri, int samplePercentage)
throws IOException {
Path[] paths = listPaths(uri);
return getSamplePathsHelper(paths, samplePercentage);
}
private static Path[] getSamplePathsHelper(Path[] paths,
int samplePercentage) {
List pathList = filteredPaths(paths);
final int TOTAL_SOJ_PATHS = pathList.size();
final int sampleNumPaths = (TOTAL_SOJ_PATHS * samplePercentage) / 100;
final int divisionFactor = 100 / samplePercentage;
Path[] samplePaths = new Path[sampleNumPaths];
int j = 0;
for (int i = 0; i < TOTAL_SOJ_PATHS; i++) {
if (j >= sampleNumPaths) {
break;
}
if (i % divisionFactor == 0) {
samplePaths[j++] = pathList.get(i);
}
}
return samplePaths;
}
/**
*
* @author : Prashant Kommireddi
* @date : Apr 25, 2011
* @return : boolean
* @desc : This function returns false if the HDFS filename specified by
* argument Path p, starts with a "_" or "." This is useful to skip
* the files that must be ignored/NOT read.
*/
public static boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
public static List filteredPaths(Path[] paths) {
List filteredList = new ArrayList();
for (int i = 0; i < paths.length; i++) {
if (accept(paths[i]))
filteredList.add(paths[i]);
}
return filteredList;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Returns an array of Path(s) corresponding to an input URI.
* @return Path[]
*
*/
public static Path[] listPaths(String uRi) throws IOException {
String uri = uRi;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uRi);
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Function returns an array of Path(s) contained in the given HDFS
* path
* @return Path[]
*
*/
public static Path[] listPaths(FileSystem fs, Path path) throws IOException {
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
}
Making a call to DataSampler
Calling getSamplePaths(FileSystem fs, Path path, int samplePercentage)
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));
The function getSamplePaths takes 3 arguments:
- FileSystem object
- Input Path - this must be a HDFS location
- Sample percentage - eg 10%, 1%, 20%
Based on the above arguments, the function returns an absolute number of paths which can be passed as input to the MR job using :
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));
Of course, you could use regex expressions (GlobFilter etc) to pick a subset of files too. I wrote this class to make it easier to return the subset and not worry about the internal structure/naming of HDFS file.