Pig – startup script behavior

Pig is shipped with hadoop. A user could either use pig bundled with hadoop, or provide his/her own hadoop version. So the 2 ways to startup pig are, using:

  1. bundled  (pig shipped with hadoop, pig.jar)
  2. non-bundled (hadoop provided by user, pig-withouthadoop.jar)

As the name suggests, pig-withouthadoop.jar does not contain any hadoop libs/classes whereas pig.jar contains Hadoop 1.0.0 (as of release 0.11.1).

How does pig know which hadoop to use?

For this, let’s understand how the startup script (bin/pig) works. Here’s a simple representation of what happens:

  1. Use non-bundled if hadoop is set on classpath
  2. If not, use non-bundled if HADOOP_HOME is set
  3. If not, fallback on bundled pig jar
  4. Note that steps 1 and 2 should actually contain hadoop, it not pig startup script will fallback on bundled pig.

If you were to use pig with a Hadoop cluster running on 0.20.2, it could done by either setting hadoop on classpath or by specifying it via HADOOP_HOME.

export HADOOP_HOME=<path_to_hadoop>

On my machine, I have hadoop on the classpath

$ which hadoop
/home/pkommireddi/dev/tools/Linux/hadoop/hadoop-0.20.2/bin/hadoop

Specifying additional Java runtime options

Additional runtime options can be specified using PIG_OPTS

export PIG_OPTS=”-Dmapred.job.queue.name=my-queue”

Pro-tip: Make sure you are not inadvertently overriding PIG_OPTS that might have been set elsewhere. Instead, add to PIG_OPTS

export PIG_OPTS=”$PIG_OPTS -Dmapred.job.queue.name=my-queue”

Interesting behavior when using user-specified hadoop (non-bundled):
Pig script uses hadoop startup script ($HADOOP_HOME/bin/hadoop) in non-bundled mode. It adds PIG_OPTS to HADOOP_OPTS and invokes the hadoop startup script (let’s call it HSS). HSS picks up environment variables from “hadoop-env.sh”. It could be entirely possible that HADOOP_OPTS passed to the HSS is not used at all. This would happen if hadoop-env.sh is overriding it. For eg,

export HADOOP_OPTS="-server -Dlog4j.configuration=log4j.properties"

In this case, HADOOP_OPTS in hadoop-env.sh does not use user passed options. Instead, similar to PIG_OPTS tip above, HADOOP_OPTS should add to existing options in hadoop-env.sh

export HADOOP_OPTS="$HADOOP_OPTS -server -Dlog4j.configuration=log4j.properties"

Specifying additional classpath entries:

Use PIG_CLASSPATH to specify addition classpath entries. For eg, to add hadoop configuration files (hadoop-site.xml, core-site.xml) to classpath

export PIG_CLASSPATH=<path_to_hadoop_conf_dir>

Starting version 0.12, you should be able to override default classpath entries by setting PIG_USER_CLASSPATH_FIRST

export PIG_USER_CLASSPATH_FIRST=true

Specifying additional jars:

At times you might need jars from libraries not included in Pig distribution. These could be your custom UDFs, or 3rd party libs. Pig lets you add these jars to classpath using “pig.additional.jars”

 pig -Dpig.additional.jars=myjar.jar script.pig

Alternately, you could use REGISTER within your script.

Debugging:

Pig provides a command for users to debug any pig startup issues related to classpath. It provides information on

  1. Hadoop version,
  2. Pig version
  3. classpath
  4. java runtime options
  5. bundled vs unbundled

This is the “-secretDebugCmd”  – this really should not be such a secret :)

$ pig -x local -secretDebugCmd

Cannot find local hadoop installation, using bundled Hadoop 1.0.0
dry run:
/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home/bin/java -Xmx1000m -Dpig.log.dir=/Users/pkommireddi/work/pig/pig-latest/bin/../logs -Dpig.log.file=pig.log -Dpig.home.dir=/Users/pkommireddi/work/pig/pig-latest/bin/.. -classpath /Users/pkommireddi/work/pig/pig-latest/bin/../conf:/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home/lib/tools.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jython-standalone-2.5.2.jar /Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jython-standalone-2.5.3.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jruby-complete-1.6.7.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../pig.jar org.apache.pig.Main -x local

Conclusion:

If it’s under your control, always try and add to existing Hadoop or Pig options instead of resetting them. Of course, at times you might not have control over hadoop-env.sh in which case you can pass pig options when running a script

"pig -Dmapred.job.queue.name=my-queue script.pig"

Pig – specify a default script .pigbootup

From version 0.11 onwards, Pig provides the ability to add a default set of statements that could be loaded every time one started pig. During adhoc analysis it is common to use a common set of DEFINE, REGISTER statements to declare the additional jars and UDFs the script requires. You can now create a file (.pigbootup) containing these statements that need to be used each time you start pig.

Let’s see what it looks like. Here is my grunt shell when I start pig in local mode

localhost:pig-trunk pkommireddi$ bin/pig -x local
2013-02-05 22:32:16,171 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0-SNAPSHOT (r1442025) compiled Feb 03 2013, 21:36:06
2013-02-05 22:32:16,172 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/pkommireddi/work/pig/pig-trunk/pig_1360132336169.log
2013-02-05 22:32:16,192 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /Users/pkommireddi/.pigbootup not found
2013-02-05 22:32:16,345 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>

Now I create .pigbootup under my HOME directory with a few entries

localhost:pig-trunk pkommireddi$ cat ~/.pigbootup 
REGISTER 'foo.jar';
DEFINE MY_UDF com.sfdc.BAR();
SET default_parallel 10;

Let’s start pig again

localhost:pig-trunk pkommireddi$ bin/pig -x local
2013-02-05 22:35:48,148 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0-SNAPSHOT (r1442025) compiled Feb 03 2013, 21:36:06
2013-02-05 22:35:48,149 [main] INFO  org.apache.pig.Main - Logging error messages to: /Users/pkommireddi/work/pig/pig-trunk/pig_1360132548146.log
2013-02-05 22:35:48,324 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt> REGISTER 'foo.jar';
grunt> DEFINE MY_UDF com.sfdc.BAR();
grunt> SET default_parallel 10;
grunt>

You can see that pig has loaded up the 3 statements from .pigbootup automagically.

Location of the file .pigbootup is configurable

Location of .pigbootup is configurable via the property “pig.load.default.statements”. You can add an entry to pig.properties to point to an alternate location

Here I have added an entry to pig.properties to point to a different location

pkommireddi$ grep "pig.load.default.statements" pig.properties 
pig.load.default.statements=/Users/pkommireddi/work/.pigbootup

Time to upgrade to 0.11, there are a few other cool features to check out.

PigStorage options – Schema and Source tagging

PigStorage is probably the most frequently used Load/Store utility. It parses input records based on a delimiter and the fields thereafter can be positionally referenced or referenced via alias. Starting version 0.10, Pig has a couple of options that could be very useful and I will explain what those are here.

Options

  1. Schema: Reads/Stores the schema of the relation using a hidden JSON file (.pig_schema). Consider the following example:
    grunt> cat example;
    1 pig apache
    2 hadoop apache
    grunt> A = LOAD 'example' using PigStorage('\t') as (id:int, project:chararray, org:chararray);
    grunt> B = foreach A generate project, org;
    grunt> describe B;
    B: {project: chararray,org: chararray}
    grunt> store B into 'output';
    grunt> cat output;
    pig apache
    hadoop apache

    Schema for alias B is {project: chararray,org: chararray}.

    Now you might want to load the output file and perform further processing on it. Typically this is achieved by loading the dataset ‘output’ using PigStorage and redefining the schema. But this is redundant, and possibly error-prone.

    grunt> ExplicitSchema = LOAD 'output' using PigStorage('\t') as (project:chararray, org:chararray);

    In the above line, we are having to explicitly define schema for the dataset ‘output’

    With Pig 0.10, we now have an option to pass PigStorage the argument ‘-schema’ while storing data. This will create a ‘.pig_schema’ file in the output directory which is a JSON file containing the schema.

    store B into 'output' using PigStorage('\t', '-schema');

    So the next time you load ‘output’, you only need to specify the location of output to LOAD.

    grunt> WithSchema = LOAD 'output';
    grunt> describe WithSchema;
    WithSchema: {project: chararray,org: chararray}

    If you do not want the schema to be loaded, you can disable it with ‘-noschema’

    grunt> WithSchemaDisabled = LOAD 'output' using PigStorage('\t', '-noschema');
    grunt> describe WithSchemaDisabled;
    Schema for WithSchemaDisabled unknown.

    Another useful property of this option is that it creates a header file containing the column names.

    From Pig Storage java docs: This file “.pig_headers” is dropped in the output directory. This file simply lists the delimited aliases. This is intended to make export to tools that can read files with header lines easier (just cat the header to your data).

    To summarize (thanks to Dmitriy for suggesting adding this section):

      • PigStorage always tries to load the .pig_schema file, unless you explicitly say -noschema.
      • If you don’t specify anything at all, PigStorage will try to load a schema, and silently fail (behave as before) if it’s not present or unreadable.
      • If you specify -schema during loading, PigStorage will fail if a schema is not present.
      • If you specify -noschema during loading, PigStorage will ignore the .pig_schema file.
      • PigStorage will only *store* the schema if you specify -schema.

     

  2. Source tagging: Adds source filename as first field of each tuple (Please refer to UPDATE below before reading this) You may sometimes want to know the exact file that a record came from. For example, let’s say we have a Log dataset that is partitioned based on application server id. All log events from <app server 1> are contained in a file logs_app_server_1.gz, all events from <app server 2> are contained in logs_app_server_2.gz and so on. When you read all these app log files at once, you may want to throw in the app server id into your analysis. PigStorage can now (Pig release 0.10) be used to accomplish this. If -tagsource is specified, PigStorage will prepend input split path to each Tuple/row. User needs to ensure pig.splitCombination is set to false. This is because Pig by default can combine small files (based on property pig.splitCombination) and pass it to the mapper, in which case it would be difficult to determine the exact input filename from combined records.
    set pig.splitCombination false;

    Usage:

    A = LOAD 'input' using PigStorage(',','-tagsource'); 
    B = foreach A generate $0 as input_filename;

    The first field in each Tuple will contain input filename.

    For example, let’s say we have 2 files ‘data’ and ‘data2′.

    grunt> cat data
    [open#apache,1#2,11#2]
    [apache#hadoop,3#4,12#hadoop]
    
    grunt> cat data2
    1	10
    4	11
    5	10
    
    grunt> set pig.splitCombination false;
    
    grunt> A = load 'data*' using PigStorage('\t', '-tagsource');
    
    grunt> dump A;
    (data,[open#apache,1#2,11#2])
    (data,[apache#hadoop,3#4,12#hadoop])
    (data2,1,10)
    (data2,4,11)
    (data2,5,10)

    CAVEAT:
    Disabling “pig.splitCombination” can have a negative affect on performance of Pig jobs. Please note this property is turned on by default. It combines multiple small input files and passes them to a single mapper. Files are combined up to size specified by property “pig.maxCombinedSplitSize”. For eg, if you have 10 files 25 MB each and “pig.maxCombinedSplitSize” set to 256 MB, PigStorage combines all of these 10 files and passes it to a single mapper.

    Please do consider this performance hit while using ‘-tagsource’.

Many more features and improvements to look forward to with Pig 0.10, these PigStorage features is only 1 of them!

UPDATE 05/28/2012

There was a fix made due to which we no longer need to disable “pig.splitCombination” https://issues.apache.org/jira/browse/PIG-2462. Please DO NOT disable this feature in your pig scripts.

Data Export from Hadoop MapReduce to Database

Hadoop has become a huge part of Data Warehouse in most companies. It is used for a variety of use-cases: Search and Web Indexing, Machine learning, Analytics and Reporting, and so on. Most organizations are building Hadoop clusters in addition to maintaining traditional Data Warehousing technologies such as Teradata, Vertica and other OLAP systems. A very common use-case is processing huge amounts of data (by huge I mean Tera, Peta!) with Hadoop and loading aggregated summary data into the OLAP engines.

However, with Hadoop we need to keep in mind that it’s a system designed for fault-tolerance, amongst others. In order to keep the job processing moving forward in case of machine failures, Hadoop spawns a Task (Map or Reduce) on more than 1 node if it thinks the originally spawned task is slower compared to the rest. This is known as speculative execution. A task could be slow due to bad memory, disk or other hardware issues (or simply due to handling more data compared to other Map or Reduce tasks). And Hadoop tries to have a node that is free perform the work for a task that is slow (compared to the rest of the tasks).

Here is a good description of Speculative Execution from Yahoo Developer Network Blog

One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program. For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Data export from MapReduce to Database:

There could be a problem when using Speculative Execution with MapReduce jobs that directly write to database. The problem arises due to the fact that if the same input is processed multiple times, each one of those parallel tasks might be writing the exact same records to the database before the redundant tasks are killed.

Disable Speculative execution to avoid this:

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution Configuration options to false, respectively.

Snappy compression with Pig and native MapReduce

Tags

, ,

Assuming you have installed Hadoop on your cluster, if not please follow http://code.google.com/p/hadoop-snappy/

This is the machine config of my cluster nodes, though the steps that follow could be followed with your installation/machine configs

pkommireddi@pkommireddi-wsl:/tools/hadoop/pig-0.9.1/lib$ uname -a
Linux pkommireddi-wsl 2.6.32-37-generic #81-Ubuntu SMP Fri Dec 2 20:32:42 UTC 2011 x86_64 GNU/Linux

Pig requires that the snappy jar and native be available on its classpath when a script is run.

The pig client here is installed at /tools/hadoop and the jar needs to be placed within $PIG_HOME/lib.

/tools/hadoop/pig-0.9.1/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar

Also, you need to point PIG to the snappy native

export PIG_OPTS="$PIG_OPTS -Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"

Now you have 2 ways to use map output compression in the Pig scripts:

  1. Follow instructions on http://code.google.com/p/hadoop-snappy/ to set map output compression at a cluster level
  2. Use Pig’s “set” keyword for per job level configuration
    set mapred.compress.map.output true;
    set mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;

This should get you going with using Snappy for Map output compression with Pig. You can read and write Snappy compressed files as well, though I would not recommend doing that as its not very efficient space-wise compared to other compression algorithms. There is work being done to be able to use Snappy for creating intermediate/temporary files between multiple MR jobs. You can watch the work item here https://issues.apache.org/jira/browse/PIG-2319

Using Snappy for Native Java MapReduce:

Set Configuration parameters for Map output compression

Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.set("mapred.map.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");

Set Configuration parameters for Snappy compressed intermediate Sequence Files

conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); //Block level is better than Record level, in most cases
SequenceFileOutputFormat.setCompressOutput(conf, true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");

Benefits:

  1. Map tasks begin transferring data sooner compared to Gzip or Bzip (though more data needs to be transferred to Reduce tasks)
  2. Reduce tasks run faster with better decompression speeds
  3. Snappy is not CPU intensive – which means MR tasks have more CPU for user operations

What you SHOULD use Snappy for

Map output: Snappy works great if you have large amounts of data flowing from Mappers to the Reducers (you might not see a significant difference if data volume between Map and Reduce is low)

Temporary Intermediate files (not available currently as of Pig 0.9.2, applicable only to native Map Reduce) : If you have a series of MR jobs chained together, Snappy compression is a good way to store the intermediate files. Please do make sure these intermediate files are cleaned up soon enough so we don’t have disk space issues on the cluster.

What you should NOT use Snappy for

Permanent Storage: Snappy compression is not efficient space-wise and it is expensive to store data on HDFS (3-way replication)

Plain text files: Like Gzip, Snappy is not splittable. Do not store plain text files in Snappy compressed form, instead use a container like SequenceFile.

PIG – AGGREGATE WARNINGS

With Pig 0.8 onwards there is a configuration parameter “aggregate.warning” that is by default set to true. This means when your Pig script (UDF) encounters an exception, Pig aggregates these exception messages and prints out the summary at the end of script. Something like,

11/12/09 13:35:21 WARN mapReduceLayer.MapReduceLauncher: Encountered Warning DIVIDE_BY_ZERO 14 time(s).11/12/09 13:35:21 WARN mapReduceLayer.MapReduceLauncher: Encountered Warning UDF_WARNING_3 19113867 time(s).

What happens if aggregate.warning is set to false?

Disaster! You would expect a huge number of exceptions being thrown when you are processing web logs. You can not really predict the correctness of log lines, they could be incorrectly formatted, missing data, fields…. To err is log!

What you can do is handle these exceptions in your code, catch ArrayIndexOutOfBounds, NullPointer, ClassCast etc. But web logs are huge, may be hundreds of gigs per day and generating detailed exceptions in your Hadoop logs is not a good idea. You could potentially have all your datanodes run out of disk space.

Always make sure “aggregate.warning” is set to true.

EMBEDDED PIG IN JAVA – AGGREGATE WARNINGS

Though 0.8 onwards this set true within Pig scripts, Embedding Pig in Java behaves differently. To run Pig script from within Java program, you need to first create a PigServer object.

PigServer pigServer = new PigServer(ExexType.MAPREDUCE);
pigServer.executeQuery(“A = LOAD '/logs' using PigStorage();”);

However when you look at the output logs you would find Pig generates detailed exception messages. This is not what you would expect. I have made a fix for this but its been patched only to version 0.10. http://issues.apache.org/jira/browse/PIG-2425

For version 0.9.1 (and lower) workaround for this would be:

Properties properties = PropertiesUtil.loadDefaultProperties();
properties.setProperty("aggregate.warning", "true");
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, properties);

Merge HDFS file to Local filesystem

MapReduce jobs often require more than 1 reducer when the data volumes are huge and the data processing needs to be distributed across reduce tasks/nodes. However, at the end you might need to merge these output files to a single file; for instance to be used by a data warehouse process. Let’s say the MR job used 10 reducers to generate the output. In that case, the output HDFS file would look something like this:

[prkommireddi@xyz001 ~]$ hadoop fs -ls /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30
drwxr-xr-x   - prkommireddi hdmi-technology          0 2011-08-04 11:52 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/_logs
-rw-r--r--   3 prkommireddi hdmi-technology     150417 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00000
-rw-r--r--   3 prkommireddi hdmi-technology     151142 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00001
-rw-r--r--   3 prkommireddi hdmi-technology     151521 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00002
-rw-r--r--   3 prkommireddi hdmi-technology     152603 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00003
-rw-r--r--   3 prkommireddi hdmi-technology     156200 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00004
-rw-r--r--   3 prkommireddi hdmi-technology     150540 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00005
-rw-r--r--   3 prkommireddi hdmi-technology     153042 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00006
-rw-r--r--   3 prkommireddi hdmi-technology     150738 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00007
-rw-r--r--   3 prkommireddi hdmi-technology     159762 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00008
-rw-r--r--   3 prkommireddi hdmi-technology     149084 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00009
You should ignore files that precede with “_” or “.” which are ignored while being read by most InputFormats. The “_logs” for example will not be read by TextInputFormat or any regular InputFormat.
Now, coming back to the example with 10 reducers. The output from MR job generates 10 partitions part-r-00000 through part-r-00009. With the word count example each reducer will generate a partition containing unique words. That is, a word that shows up in part-r-00000 will not show up in part-r-00001 through part-r-00009. Now, you might need to merge these partitions into a single file.
You could do this on the command line with the getmerge command :
hadoop fs -getmerge apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30 ${HOME}/merge.txt
With the above command, partitions 1-9 would be merged into a single local file merge.txt. Make sure “merge.txt” is not already present or the command fails.
Another way to merge an HDFS file to local FS is by using FileUtil. A static function “copyMerge” needs to be invoked, here is the definition from docs

copyMerge

public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf,String addString) throws IOException
Copy all files in a directory to one output file (merge).
Throws:
IOException
The 1st argument needs to be an instance of distributed filesystem, whereas the 3rd argument needs to be an instance of LocalFileSytem.
FileSystem fs = FileSystem.get(conf);   //Returns the configured filesystem implementation, HDFS in this case.
FileSystem localFS = FileSystem.getLocal(conf); // Returns the local file system
Here is the wrapper function I wrote to do this:
/**
*
* @author : prkommireddi
* @date   : Aug 9, 2011
* @desc   : The method merges a HDFS directory partitions
*     to a single file on local FS.
*       deleteSrc :  boolean indicating whether source needs to be
*  deleted after merge.
* deleteDst :  boolean indicating whether destination needs to
*  be deleted if exists before merge.
* @return : boolean
*/
public static boolean mergeToLocal(FileSystem fs, Path src, FileSystem localFS, Path dst, Configuration conf, boolean deleteSrc, boolean deleteDst) {
try {
if(deleteDst)
if(localFS.exists(dst))
localFS.delete(dst);
FileUtil.copyMerge(fs, src, localFS, dst, deleteSrc, conf, “”);
returntrue;
} catch (IOException e) {
e.printStackTrace();
}
returnfalse;
}
Hope that helps.

HDFS – Data Sampler



Data Sampler

This class allows you to sample files based on a sampling percentage. This is useful while developing/testing your hadoop apps if you want your Map-Reduce job to use only a subset of entire dataset. This is NOT a “true” random sample. The goal for writing this code was to allow MR jobs to be able to run with fewer HDFS data partitions as input, helping jobs finish sooner and making debugging easier. This eliminates the need to read the entire dataset. The idea is not to generate a truly random sample, but to read ONLY a few partitions.


package hadoop.util.data;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class DataSampler {
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc This function returns Path array based on the sampling percentage
* desired. For eg, for 450-way HDFS file if a 10% sample is required,
* this function returns a subset containing 45 paths.
* @return Path[]
*
*/
public static Path[] getSamplePaths(FileSystem fs, Path path,
int samplePercentage) throws IOException {
Path[] paths = listPaths(fs, path);
return getSamplePathsHelper(paths, samplePercentage);
}
public static Path[] getSamplePaths(String uri, int samplePercentage)
throws IOException {
Path[] paths = listPaths(uri);
return getSamplePathsHelper(paths, samplePercentage);
}
private static Path[] getSamplePathsHelper(Path[] paths,
int samplePercentage) {
List pathList = filteredPaths(paths);
final int TOTAL_SOJ_PATHS = pathList.size();
final int sampleNumPaths = (TOTAL_SOJ_PATHS * samplePercentage) / 100;
final int divisionFactor = 100 / samplePercentage;
Path[] samplePaths = new Path[sampleNumPaths];
int j = 0;
for (int i = 0; i < TOTAL_SOJ_PATHS; i++) {
if (j >= sampleNumPaths) {
break;
}
if (i % divisionFactor == 0) {
samplePaths[j++] = pathList.get(i);
}
}
return samplePaths;
}
/**
*
* @author : Prashant Kommireddi
* @date : Apr 25, 2011
* @return : boolean
* @desc : This function returns false if the HDFS filename specified by
* argument Path p, starts with a "_" or "." This is useful to skip
* the files that must be ignored/NOT read.
*/
public static boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
public static List filteredPaths(Path[] paths) {
List filteredList = new ArrayList();
for (int i = 0; i < paths.length; i++) {
if (accept(paths[i]))
filteredList.add(paths[i]);
}
return filteredList;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Returns an array of Path(s) corresponding to an input URI.
* @return Path[]
*
*/
public static Path[] listPaths(String uRi) throws IOException {
String uri = uRi;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uRi);
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Function returns an array of Path(s) contained in the given HDFS
* path
* @return Path[]
*
*/
public static Path[] listPaths(FileSystem fs, Path path) throws IOException {
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
}



Making a call to DataSampler

Calling getSamplePaths(FileSystem fs, Path path, int samplePercentage)
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));

The function getSamplePaths takes 3 arguments:
  1. FileSystem object
  2. Input Path - this must be a HDFS location
  3. Sample percentage - eg 10%, 1%, 20%
Based on the above arguments, the function returns an absolute number of paths which can be passed as input to the MR job using :
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));
Of course, you could use regex expressions (GlobFilter etc) to pick a subset of files too. I wrote this class to make it easier to return the subset and not worry about the internal structure/naming of HDFS file.

Hadoop java.lang.OutOfMemoryError: Java heap space

Ever run into Out of memory error with your MR tasks? This could happen sometimes if you are using large lookups (DistributedCache) or data structures holding huge amount of Entries, and the heap allocated to MapReduce tasks is not sufficient to handle this.

Stepping back, each map and reduce task is launched in separate JVM’. The heap allocated here is based on the configuration parameter “mapred.child.java.opts“. If this is too small to handle your data, the hadoop job will complain it does not have sufficient heap to continue. Try increasing the heap size through Configuration, but make sure you keep in mind the amount of RAM/number of cores/number of MR tasks configured on the nodes while adjusting the parameter.

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx2048m</value>
</property>

Here I have increased the heap size to 2 GB. Of course I had huge RAM on the nodes in the cluster to be able to do so. Adding this as a resource to Configuration is one way, another is to simply inject it into Configuration invoking the set method.

set

public void set(String name, String value)

Set the value of the name property.

Parameters:
name – property name.
value – property value.

Unicode characters/Ctrl G or Ctrl A as TextOutputFormat (Hadoop) delimiter


Ever tried specifying Ctrl-G or Ctrl-A as a delimiter using the TextOutputFormat? Well, you would not be able to with the current version of Hadoop.


11/04/11 18:39:43 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 3ebdc922a897735c130a12bb44fc8c0819077f9d]
Exception in thread “main” org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.lang.RuntimeException:
org.xml.sax.SAXParseException: Character reference “&#7″ is an invalid XML character.
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:1317)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:1186)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:1115)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:425)
at org.apache.hadoop.mapred.JobConf.checkAndWarnDeprecation(JobConf.java:1709)
at org.apache.hadoop.mapred.JobConf.
(JobConf.java:214)
at org.apache.hadoop.mapred.JobInProgress.
(JobInProgress.java:264)
at org.apache.hadoop.mapred.JobInProgress.
(JobInProgress.java:240)
at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3026)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:512)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:968)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:964)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:962)

at org.apache.hadoop.ipc.Client.call(Client.java:818)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:221)
at org.apache.hadoop.mapred.$Proxy1.submitJob(Unknown Source)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:841)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:443)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:467)
at com.ebay.ice.hadoop.mobius.srp.mapred.SRPImpressionCounter.run(SRPImpressionCounter.java:182)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at com.ebay.ice.hadoop.mobius.srp.mapred.SRPImpressionCounter.main(SRPImpressionCounter.java:110)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

I faced the error shown above when I tried to use Ctrl-G (\u0007) as the delimiter between key-value pairs in the output. The issue here is that the Client serializes the Configuration, which is later unmarshalled by the JobTracker. This step on the JobTracker fails as it is unable to de-serialize “\u0007″, or for that matter, any special unicode character.

Workaround (a dirty one at that!) : Create your custom textoutputformat that specifies the unicode character as the default in the code. This is a really dirty hack, and I am working on making the code generic to accept the special unicode delimiter as an argument.

Here is the code from TextOutputFormat :

public RecordWriter<K, V> getRecordWriter(FileSystem ignored,

JobConf job,

String name,

Progressable progress)

throws IOException {

boolean isCompressed = getCompressOutput(job);

String keyValueSeparator = job.get(“mapred.textoutputformat.separator”,

“\t”);

if (!isCompressed) {

Path file = FileOutputFormat.getTaskOutputPath(job, name);

FileSystem fs = file.getFileSystem(job);

FSDataOutputStream fileOut = fs.create(file, progress);

return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);

} else {

Classextends CompressionCodec> codecClass =

getOutputCompressorClass(job, GzipCodec.class);

// create the named codec

CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);

// build the filename including the extension

Path file =

FileOutputFormat.getTaskOutputPath(job,

name + codec.getDefaultExtension());

FileSystem fs = file.getFileSystem(job);

FSDataOutputStream fileOut = fs.create(file, progress);

return new LineRecordWriter<K, V>(new DataOutputStream

(codec.createOutputStream(fileOut)),

keyValueSeparator);

}

}

The RecordReader implementation uses “\t” as the default. Changing this in your Custom output format reader should work. But again, this is a “really dirty” hack! I will post later once I have a better implementation.

Another hack, would be to provide the delimiter through an XML resource file. The xml version needs to be marked 1.1, since 1.0 fails to recognize the special unicode characters. The XML 1.0 spec explicitly omitted most of the non-printing characters in the range 0×00 to 0x1F.

Name: mapred.textoutputformat.separator
Value: \u0007

<?xml version="1.1"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>

<name>hadoop.user</name>

<value>${user.name}</value>

</property>

<property>

<name>mapred.textoutputformat.separator</name>

<value>\u0007</value>

</property>

</configuration>

Follow

Get every new post delivered to your Inbox.