MapReduce jobs often require more than 1 reducer when the data volumes are huge and the data processing needs to be distributed across reduce tasks/nodes. However, at the end you might need to merge these output files to a single file; for instance to be used by a data warehouse process. Let’s say the MR job used 10 reducers to generate the output. In that case, the output HDFS file would look something like this:

[prkommireddi@xyz001 ~]$ hadoop fs -ls /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30
drwxr-xr-x   - prkommireddi hdmi-technology          0 2011-08-04 11:52 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/_logs
-rw-r--r--   3 prkommireddi hdmi-technology     150417 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00000
-rw-r--r--   3 prkommireddi hdmi-technology     151142 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00001
-rw-r--r--   3 prkommireddi hdmi-technology     151521 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00002
-rw-r--r--   3 prkommireddi hdmi-technology     152603 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00003
-rw-r--r--   3 prkommireddi hdmi-technology     156200 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00004
-rw-r--r--   3 prkommireddi hdmi-technology     150540 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00005
-rw-r--r--   3 prkommireddi hdmi-technology     153042 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00006
-rw-r--r--   3 prkommireddi hdmi-technology     150738 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00007
-rw-r--r--   3 prkommireddi hdmi-technology     159762 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00008
-rw-r--r--   3 prkommireddi hdmi-technology     149084 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00009
You should ignore files that precede with “_” or “.” which are ignored while being read by most InputFormats. The “_logs” for example will not be read by TextInputFormat or any regular InputFormat.
Now, coming back to the example with 10 reducers. The output from MR job generates 10 partitions part-r-00000 through part-r-00009. With the word count example each reducer will generate a partition containing unique words. That is, a word that shows up in part-r-00000 will not show up in part-r-00001 through part-r-00009. Now, you might need to merge these partitions into a single file.
You could do this on the command line with the getmerge command :
hadoop fs -getmerge apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30 ${HOME}/merge.txt
With the above command, partitions 1-9 would be merged into a single local file merge.txt. Make sure “merge.txt” is not already present or the command fails.
Another way to merge an HDFS file to local FS is by using FileUtil. A static function “copyMerge” needs to be invoked, here is the definition from docs

copyMerge

public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf,String addString) throws IOException
Copy all files in a directory to one output file (merge).
Throws:
IOException
The 1st argument needs to be an instance of distributed filesystem, whereas the 3rd argument needs to be an instance of LocalFileSytem.
FileSystem fs = FileSystem.get(conf);   //Returns the configured filesystem implementation, HDFS in this case.
FileSystem localFS = FileSystem.getLocal(conf); // Returns the local file system
Here is the wrapper function I wrote to do this:
/**
*
* @author : prkommireddi
* @date   : Aug 9, 2011
* @desc   : The method merges a HDFS directory partitions
*     to a single file on local FS.
*       deleteSrc :  boolean indicating whether source needs to be
*  deleted after merge.
* deleteDst :  boolean indicating whether destination needs to
*  be deleted if exists before merge.
* @return : boolean
*/
public static boolean mergeToLocal(FileSystem fs, Path src, FileSystem localFS, Path dst, Configuration conf, boolean deleteSrc, boolean deleteDst) {
try {
if(deleteDst)
if(localFS.exists(dst))
localFS.delete(dst);
FileUtil.copyMerge(fs, src, localFS, dst, deleteSrc, conf, “”);
returntrue;
} catch (IOException e) {
e.printStackTrace();
}
returnfalse;
}
Hope that helps.
Advertisements