Parquet – columnar storage for Hadoop

Parquet is a columnar storage format for Hadoop that uses the concept of repetition/definition levels borrowed from Google Dremel. It provides efficient encoding and compression schemes, the efficiency being improved due to application of aforementioned on a per-column basis (compression is better as column values would all be the same type, encoding is better as values within a column could often be the same and repeated). Here is a nice blog post from Julien describing Parquet internals.

Parquet can be used by any project in the Hadoop ecosystem, there are integrations provided for M/R, Pig, Hive, Cascading and Impala.

I am by no means an expert at this, and a lot of what I write here is based on my conversations with a couple of key contributors on the project (@J_ and @aniket486). Also, most of the content mentioned on this post is based on Pig+Parquet integration. We at Salesforce.com have started using Parquet for application logs processing with Pig and are encouraged with the preliminary performance results.

Writing a Parquet file

There is parquet.hadoop.ParquetWriter. You need to decide which ObjectModel you want to use. It could be Thrift, Avro, Pig or the example model. Here is a function for writing a file using the Pig model (TupleWriteSupport)

private void write(String pigSchemaString, String writePath) throws Exception {
     Schema pigSchema = Utils.getSchemaFromString(pigSchemaString);
     TupleWriteSupport writeSupport = new TupleWriteSupport(pigSchema);      
     FileSystem fs = FileSystem.get(new Configuration());
     Path path = new Path(writePath);
     if(fs.exists(path)) {
         fs.delete(path, true);
     }
     ParquetWriter<Tuple> writer = new ParquetWriter<Tuple>(path, writeSupport,       CompressionCodecName.UNCOMPRESSED,  ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, false, false);

     TupleFactory tf = TupleFactory.getInstance();
     for (int i = 0; i < NUM_RECORDS; i++) {
         Tuple t = tf.newTuple();
         for(int j=0; j<pigSchema.size(); j++) {
         t.append(i+j);
         }
         writer.write(t);

     }
     writer.close();
}

“pigSchemaString” is the schema for the parquet file. This could be any valid pig schema, such as “a:int, b:int, c:int”. Note that I insert integer values in the tuple and hence schema fields are defined to be int.

So what exactly happened during the write? I use TupleWriteSupport which is a WriteSupport implementation that helps us write parquet files compatible with Pig. I then use ParquetWriter passing in a few arguments:

  1. path – file path to write to

  2. writeSupport – TupleWriteSupport in this case

  3. compressionCodecName – could be UNCOMPRESSED, GZIP, SNAPPY, LZO

  4. blockSize – block size which is 128M by default. Total size used by a block

  5. pageSize –  from the parquet docs: “pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk.  We recommend 8KB for page sizes.” Default page size is 1MB

  6. enableDictionary – turn on/off dictionary encoding

At the end, I create tuples with a few elements and write to the parquet file.

Reading a Parquet file

There is a bug due to which trying to read Parquet files written using the Pig model (TupleWriteSupport) directly with ParquetReader fails. See https://github.com/Parquet/parquet-mr/issues/195

However, you can use Pig to read the file.

A = load ‘parquet_file’ USING parquet.pig.ParquetLoader();

describe A;

{a: int, b: int, c: int}

B = foreach A generate a;

dump B;

Initial Findings

I ran the PerfTest and noticed it takes longer to read 1 column and takes progressively less for more columns.

Parquet file read took : 1790(ms) for 1 columns
Parquet file read took : 565(ms) for 2 columns
Parquet file read took : 560(ms) for 3 columns
Parquet file read took : 544(ms) for 4 columns
Parquet file read took : 554(ms) for 5 columns
Parquet file read took : 550(ms) for 10 columns
Parquet file read took : 545(ms) for 20 columns
Parquet file read took : 563(ms) for 30 columns
Parquet file read took : 1653(ms) for 40 columns
Parquet file read took : 1049(ms) for 50 columns

That’s the JVM warmup phase!

The JIT compiler will inline methods based on how often they are called. So it waits before doing so to see what gets called often. Julien suggested I run it twice in a row (in the same process) and compare the times. This is generic and nothing in particular to Parquet, but I wanted to highlight in case you happen to run into similar perf numbers.

Voila! 2nd iteration did provide better results.

Parquet file read took : 1809(ms) for 1 columns, iteration 0
Parquet file read took : 563(ms) for 2 columns, iteration 0
Parquet file read took : 562(ms) for 3 columns, iteration 0
Parquet file read took : 548(ms) for 4 columns, iteration 0
Parquet file read took : 548(ms) for 5 columns, iteration 0
Parquet file read took : 554(ms) for 10 columns, iteration 0
Parquet file read took : 548(ms) for 20 columns, iteration 0
Parquet file read took : 550(ms) for 30 columns, iteration 0
Parquet file read took : 1603(ms) for 40 columns, iteration 0
Parquet file read took : 1054(ms) for 50 columns, iteration 0
Parquet file read took : 536(ms) for 1 columns, iteration 1
Parquet file read took : 531(ms) for 2 columns, iteration 1
Parquet file read took : 527(ms) for 3 columns, iteration 1
Parquet file read took : 532(ms) for 4 columns, iteration 1
Parquet file read took : 527(ms) for 5 columns, iteration 1
Parquet file read took : 533(ms) for 10 columns, iteration 1
Parquet file read took : 537(ms) for 20 columns, iteration 1
Parquet file read took : 534(ms) for 30 columns, iteration 1
Parquet file read took : 538(ms) for 40 columns, iteration 1
Parquet file read took : 1966(ms) for 50 columns, iteration 1
Parquet file read took : 523(ms) for 1 columns, iteration 2
Parquet file read took : 525(ms) for 2 columns, iteration 2
Parquet file read took : 691(ms) for 3 columns, iteration 2
Parquet file read took : 529(ms) for 4 columns, iteration 2
Parquet file read took : 530(ms) for 5 columns, iteration 2
Parquet file read took : 531(ms) for 10 columns, iteration 2
Parquet file read took : 532(ms) for 20 columns, iteration 2
Parquet file read took : 532(ms) for 30 columns, iteration 2
Parquet file read took : 1032(ms) for 40 columns, iteration 2
Parquet file read took : 1044(ms) for 50 columns, iteration 2

However its interesting to note there isn’t a huge difference in terms of read time for 1 column vs more. This can be attributed to the fact that this test ran on a dataset with only 100k rows (a few MBs). It will be explained later, advantages of Parquet format are more apparent when used with large files.

I ran another test – reading a regular text file vs parquet file. The file contained 20M rows and 6 columns. The results didn’t seem right, both storage and processing-wise. Was I missing something?

Text file size : 513M
Parquet file size : 895M

Time taken to read, Text : 13784ms
Time taken to read, Parquet: 19551ms

By default compression is not enabled, which is why the Parquet file is larger (footers, headers, summary files take up additional space. Note Parquet stores information regarding each page, column chunk, file to be able to determine the exact pages that need to be loaded by a query. You can find additional info here).

Also if you are reading all the columns, it is expected that the columnar format will be slower as row storage is more efficient when you read all the columns. Project fewer columns and you should find a difference. You should see a projection pushdown message in the logs.

Yes you need bigger files to get benefits from the columnar storage.

At this point, I wanted to try out encoding and see how that plays out on the overall storage. My next question – Are different encoding (RLE, dictionary) to be provided by the client, or does Parquet figure out the right one to use based on the data? Turns out Parquet will use the dictionary encoding if it can but right now you need to turn that on. http://parquet.io/parquet-mr/site/1.0.0-SNAPSHOT/apidocs/index.html

parquet.enable.dictionary=true

Finally did get some nice results after enabling dictionary encoding and filtering on a single column. It was a lot better storage wise too once Dictionary Encoding was enabled. (The following was run on a larger dataset)

1st iteration:

Text file read took : 42231ms
Parquet file read took : 27853ms

Here are the numbers from 2nd iteration - just to negate the effects of JVM warmup, and to be fair to text-row format :)

2nd iteration:

Text file read took : 36555ms
Parquet file read took : 27669ms

Schema Management

Parquet can handle multiple schemas. This is important for our use-case at SFDC for log processing. We have several different types of logs, each with its own schema, and we have a few hundred of them. Most pig queries run against a few log types. Parquet merges schema and provides the ability to parse out columns from different files.

LogType A : organizationId, userId, timestamp, recordId, cpuTime

LogType V : userId, organizationId, timestamp, foo, bar

A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2.

Projection Pushdown

One of the advantages of a columnar format is the fact that it can read only those parts from a file that are necessary. The columns not required are never read, avoiding unnecessary and expensive I/O.

For doing this in Pig, just pass in the required schema in to the constructor of ParquetLoader.

A = LOAD ‘/parquet/file’ USING parquet.pig.ParquetLoader.('a:int, b:int');

The above query loads columns ‘a’ and ‘b’ only. When you do so, you should find a message similar to the following in logs

Assembled and processed 4194181 records from 2 columns in 2124 ms: 1974.6615 rec/ms, 1974.6615 cell/ms

If you hadn’t done that, a file containing 16 columns would all be loaded

Assembled and processed 4194181 records from 16 columns in 6313 ms: 664.3721 rec/ms, 10629.953 cell/ms

Summary Files

Parquet generates a summary file for all part files generated under a directory (job output). The summary file reduces the number of calls to the namenode and individual slaves while producing the splits which reduces the latency to start a job significantly. Otherwise it will have to open the footer of every part file which occasionally is slowed down by the namenode or a bad slave that we happen to hit. Reading one summary file reduces the risks to hit a slow slave and the load on the namenode.

For example, if the output directory to which Parquet files are written by a Pig script is ‘/user/username/foo’.

STORE someAlias INTO ‘/user/username/foo’ using parquet.pig.ParquetStorer();

This will create part files under ‘foo’, the number of these part files depends on the number of reducers.

/user/username/foo/part-r-00000.parquet

/user/username/foo/part-r-00001.parquet

/user/username/foo/part-r-00002.parquet

/user/username/foo/part-r-00003.parquet

The summary file is generated when the hadoop job writing the files is finished as it is in the outputCommitter of the output format (ParquetOutputCommitter.commitJob). It reads all footers in parallel and creates the summary file so all subsequent “LOAD” or reads on the directory ‘foo’ could be more efficient.

There is one summary file for all the part files output by the same job. That is, one per directory containing multiple part files.

Hadoop Compatibility

Anyone who has been a part of a major hadoop upgrade should be familiar with how painful the process can be. At SFDC, we moved from a really old version 0.20.2 to 2.x (recently declared GA). This involved upgrading a ton of dependencies, making client side changes to use the newer APIs, bunch of new configurations, and eliminating a whole lot of deprecated stuff. Though this was a major upgrade and most upgrades here on should be smooth(er), it always helps if dependent and 3rd party libraries don’t need to be recompiled.

With Parquet, you should not need to re-compile for hadoop 2. It hides all the hadoop 2 incompatibilities behind reflective calls so the same jars will work.

And finally ..

We at Salesforce.com have been early adopters of several big data open source technologies. Hadoop, Pig, HBase, Kafka, Zookeeper, Oozie to name a few either have been or are in the process of making it to production. Phoenix, a SQL layer on top of HBase is a project that was homegrown and is now open-sourced. Parquet is the latest addition, and we are looking forward to using it for more datasets (Oracle exports for example) in the near future and not just application logs. The Parquet community is helpful, open to new ideas and contributions, which is great for any open source project.

@pRaShAnT1784

Pig – startup script behavior

Pig is shipped with hadoop. A user could either use pig bundled with hadoop, or provide his/her own hadoop version. So the 2 ways to startup pig are, using:

  1. bundled  (pig shipped with hadoop, pig.jar)
  2. non-bundled (hadoop provided by user, pig-withouthadoop.jar)

As the name suggests, pig-withouthadoop.jar does not contain any hadoop libs/classes whereas pig.jar contains Hadoop 1.0.0 (as of release 0.11.1).

How does pig know which hadoop to use?

For this, let’s understand how the startup script (bin/pig) works. Here’s a simple representation of what happens:

  1. Use non-bundled if hadoop is set on classpath
  2. If not, use non-bundled if HADOOP_HOME is set
  3. If not, fallback on bundled pig jar
  4. Note that steps 1 and 2 should actually contain hadoop, it not pig startup script will fallback on bundled pig.

If you were to use pig with a Hadoop cluster running on 0.20.2, it could done by either setting hadoop on classpath or by specifying it via HADOOP_HOME.

export HADOOP_HOME=<path_to_hadoop>

On my machine, I have hadoop on the classpath

$ which hadoop
/home/pkommireddi/dev/tools/Linux/hadoop/hadoop-0.20.2/bin/hadoop

Specifying additional Java runtime options

Additional runtime options can be specified using PIG_OPTS

export PIG_OPTS=”-Dmapred.job.queue.name=my-queue”

Pro-tip: Make sure you are not inadvertently overriding PIG_OPTS that might have been set elsewhere. Instead, add to PIG_OPTS

export PIG_OPTS=”$PIG_OPTS -Dmapred.job.queue.name=my-queue”

Interesting behavior when using user-specified hadoop (non-bundled):
Pig script uses hadoop startup script ($HADOOP_HOME/bin/hadoop) in non-bundled mode. It adds PIG_OPTS to HADOOP_OPTS and invokes the hadoop startup script (let’s call it HSS). HSS picks up environment variables from “hadoop-env.sh”. It could be entirely possible that HADOOP_OPTS passed to the HSS is not used at all. This would happen if hadoop-env.sh is overriding it. For eg,

export HADOOP_OPTS="-server -Dlog4j.configuration=log4j.properties"

In this case, HADOOP_OPTS in hadoop-env.sh does not use user passed options. Instead, similar to PIG_OPTS tip above, HADOOP_OPTS should add to existing options in hadoop-env.sh

export HADOOP_OPTS="$HADOOP_OPTS -server -Dlog4j.configuration=log4j.properties"

Specifying additional classpath entries:

Use PIG_CLASSPATH to specify addition classpath entries. For eg, to add hadoop configuration files (hadoop-site.xml, core-site.xml) to classpath

export PIG_CLASSPATH=<path_to_hadoop_conf_dir>

Starting version 0.12, you should be able to override default classpath entries by setting PIG_USER_CLASSPATH_FIRST

export PIG_USER_CLASSPATH_FIRST=true

Specifying additional jars:

At times you might need jars from libraries not included in Pig distribution. These could be your custom UDFs, or 3rd party libs. Pig lets you add these jars to classpath using “pig.additional.jars”

 pig -Dpig.additional.jars=myjar.jar script.pig

Alternately, you could use REGISTER within your script.

Debugging:

Pig provides a command for users to debug any pig startup issues related to classpath. It provides information on

  1. Hadoop version,
  2. Pig version
  3. classpath
  4. java runtime options
  5. bundled vs unbundled

This is the “-secretDebugCmd”  – this really should not be such a secret :)

$ pig -x local -secretDebugCmd

Cannot find local hadoop installation, using bundled Hadoop 1.0.0
dry run:
/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home/bin/java -Xmx1000m -Dpig.log.dir=/Users/pkommireddi/work/pig/pig-latest/bin/../logs -Dpig.log.file=pig.log -Dpig.home.dir=/Users/pkommireddi/work/pig/pig-latest/bin/.. -classpath /Users/pkommireddi/work/pig/pig-latest/bin/../conf:/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home/lib/tools.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jython-standalone-2.5.2.jar /Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jython-standalone-2.5.3.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../build/ivy/lib/Pig/jruby-complete-1.6.7.jar:/Users/pkommireddi/work/pig/pig-latest/bin/../pig.jar org.apache.pig.Main -x local

Conclusion:

If it’s under your control, always try and add to existing Hadoop or Pig options instead of resetting them. Of course, at times you might not have control over hadoop-env.sh in which case you can pass pig options when running a script

"pig -Dmapred.job.queue.name=my-queue script.pig"

Pig – specify a default script .pigbootup

From version 0.11 onwards, Pig provides the ability to add a default set of statements that could be loaded every time one started pig. During adhoc analysis it is common to use a common set of DEFINE, REGISTER statements to declare the additional jars and UDFs the script requires. You can now create a file (.pigbootup) containing these statements that need to be used each time you start pig.

Let’s see what it looks like. Here is my grunt shell when I start pig in local mode

localhost:pig-trunk pkommireddi$ bin/pig -x local
2013-02-05 22:32:16,171 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0-SNAPSHOT (r1442025) compiled Feb 03 2013, 21:36:06
2013-02-05 22:32:16,172 [main] INFO org.apache.pig.Main - Logging error messages to: /Users/pkommireddi/work/pig/pig-trunk/pig_1360132336169.log
2013-02-05 22:32:16,192 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /Users/pkommireddi/.pigbootup not found
2013-02-05 22:32:16,345 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>

Now I create .pigbootup under my HOME directory with a few entries

localhost:pig-trunk pkommireddi$ cat ~/.pigbootup 
REGISTER 'foo.jar';
DEFINE MY_UDF com.sfdc.BAR();
SET default_parallel 10;

Let’s start pig again

localhost:pig-trunk pkommireddi$ bin/pig -x local
2013-02-05 22:35:48,148 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0-SNAPSHOT (r1442025) compiled Feb 03 2013, 21:36:06
2013-02-05 22:35:48,149 [main] INFO  org.apache.pig.Main - Logging error messages to: /Users/pkommireddi/work/pig/pig-trunk/pig_1360132548146.log
2013-02-05 22:35:48,324 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt> REGISTER 'foo.jar';
grunt> DEFINE MY_UDF com.sfdc.BAR();
grunt> SET default_parallel 10;
grunt>

You can see that pig has loaded up the 3 statements from .pigbootup automagically.

Location of the file .pigbootup is configurable

Location of .pigbootup is configurable via the property “pig.load.default.statements”. You can add an entry to pig.properties to point to an alternate location

Here I have added an entry to pig.properties to point to a different location

pkommireddi$ grep "pig.load.default.statements" pig.properties 
pig.load.default.statements=/Users/pkommireddi/work/.pigbootup

Time to upgrade to 0.11, there are a few other cool features to check out.

PigStorage options – Schema and Source tagging

PigStorage is probably the most frequently used Load/Store utility. It parses input records based on a delimiter and the fields thereafter can be positionally referenced or referenced via alias. Starting version 0.10, Pig has a couple of options that could be very useful and I will explain what those are here.

Options

  1. Schema: Reads/Stores the schema of the relation using a hidden JSON file (.pig_schema). Consider the following example:
    grunt> cat example;
    1 pig apache
    2 hadoop apache
    grunt> A = LOAD 'example' using PigStorage('\t') as (id:int, project:chararray, org:chararray);
    grunt> B = foreach A generate project, org;
    grunt> describe B;
    B: {project: chararray,org: chararray}
    grunt> store B into 'output';
    grunt> cat output;
    pig apache
    hadoop apache

    Schema for alias B is {project: chararray,org: chararray}.

    Now you might want to load the output file and perform further processing on it. Typically this is achieved by loading the dataset ‘output’ using PigStorage and redefining the schema. But this is redundant, and possibly error-prone.

    grunt> ExplicitSchema = LOAD 'output' using PigStorage('\t') as (project:chararray, org:chararray);

    In the above line, we are having to explicitly define schema for the dataset ‘output’

    With Pig 0.10, we now have an option to pass PigStorage the argument ‘-schema’ while storing data. This will create a ‘.pig_schema’ file in the output directory which is a JSON file containing the schema.

    store B into 'output' using PigStorage('\t', '-schema');

    So the next time you load ‘output’, you only need to specify the location of output to LOAD.

    grunt> WithSchema = LOAD 'output';
    grunt> describe WithSchema;
    WithSchema: {project: chararray,org: chararray}

    If you do not want the schema to be loaded, you can disable it with ‘-noschema’

    grunt> WithSchemaDisabled = LOAD 'output' using PigStorage('\t', '-noschema');
    grunt> describe WithSchemaDisabled;
    Schema for WithSchemaDisabled unknown.

    Another useful property of this option is that it creates a header file containing the column names.

    From Pig Storage java docs: This file “.pig_headers” is dropped in the output directory. This file simply lists the delimited aliases. This is intended to make export to tools that can read files with header lines easier (just cat the header to your data).

    To summarize (thanks to Dmitriy for suggesting adding this section):

      • PigStorage always tries to load the .pig_schema file, unless you explicitly say -noschema.
      • If you don’t specify anything at all, PigStorage will try to load a schema, and silently fail (behave as before) if it’s not present or unreadable.
      • If you specify -schema during loading, PigStorage will fail if a schema is not present.
      • If you specify -noschema during loading, PigStorage will ignore the .pig_schema file.
      • PigStorage will only *store* the schema if you specify -schema.

     

  2. Source tagging: Adds source filename as first field of each tuple (Please refer to UPDATE below before reading this) You may sometimes want to know the exact file that a record came from. For example, let’s say we have a Log dataset that is partitioned based on application server id. All log events from <app server 1> are contained in a file logs_app_server_1.gz, all events from <app server 2> are contained in logs_app_server_2.gz and so on. When you read all these app log files at once, you may want to throw in the app server id into your analysis. PigStorage can now (Pig release 0.10) be used to accomplish this. If -tagsource is specified, PigStorage will prepend input split path to each Tuple/row. User needs to ensure pig.splitCombination is set to false. This is because Pig by default can combine small files (based on property pig.splitCombination) and pass it to the mapper, in which case it would be difficult to determine the exact input filename from combined records.
    set pig.splitCombination false;

    Usage:

    A = LOAD 'input' using PigStorage(',','-tagsource'); 
    B = foreach A generate $0 as input_filename;

    The first field in each Tuple will contain input filename.

    For example, let’s say we have 2 files ‘data’ and ‘data2′.

    grunt> cat data
    [open#apache,1#2,11#2]
    [apache#hadoop,3#4,12#hadoop]
    
    grunt> cat data2
    1	10
    4	11
    5	10
    
    grunt> set pig.splitCombination false;
    
    grunt> A = load 'data*' using PigStorage('\t', '-tagsource');
    
    grunt> dump A;
    (data,[open#apache,1#2,11#2])
    (data,[apache#hadoop,3#4,12#hadoop])
    (data2,1,10)
    (data2,4,11)
    (data2,5,10)

    CAVEAT:
    Disabling “pig.splitCombination” can have a negative affect on performance of Pig jobs. Please note this property is turned on by default. It combines multiple small input files and passes them to a single mapper. Files are combined up to size specified by property “pig.maxCombinedSplitSize”. For eg, if you have 10 files 25 MB each and “pig.maxCombinedSplitSize” set to 256 MB, PigStorage combines all of these 10 files and passes it to a single mapper.

    Please do consider this performance hit while using ‘-tagsource’.

Many more features and improvements to look forward to with Pig 0.10, these PigStorage features is only 1 of them!

UPDATE 05/28/2012

There was a fix made due to which we no longer need to disable “pig.splitCombination” https://issues.apache.org/jira/browse/PIG-2462. Please DO NOT disable this feature in your pig scripts.

Data Export from Hadoop MapReduce to Database

Hadoop has become a huge part of Data Warehouse in most companies. It is used for a variety of use-cases: Search and Web Indexing, Machine learning, Analytics and Reporting, and so on. Most organizations are building Hadoop clusters in addition to maintaining traditional Data Warehousing technologies such as Teradata, Vertica and other OLAP systems. A very common use-case is processing huge amounts of data (by huge I mean Tera, Peta!) with Hadoop and loading aggregated summary data into the OLAP engines.

However, with Hadoop we need to keep in mind that it’s a system designed for fault-tolerance, amongst others. In order to keep the job processing moving forward in case of machine failures, Hadoop spawns a Task (Map or Reduce) on more than 1 node if it thinks the originally spawned task is slower compared to the rest. This is known as speculative execution. A task could be slow due to bad memory, disk or other hardware issues (or simply due to handling more data compared to other Map or Reduce tasks). And Hadoop tries to have a node that is free perform the work for a task that is slow (compared to the rest of the tasks).

Here is a good description of Speculative Execution from Yahoo Developer Network Blog

One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program. For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Data export from MapReduce to Database:

There could be a problem when using Speculative Execution with MapReduce jobs that directly write to database. The problem arises due to the fact that if the same input is processed multiple times, each one of those parallel tasks might be writing the exact same records to the database before the redundant tasks are killed.

Disable Speculative execution to avoid this:

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution Configuration options to false, respectively.

Snappy compression with Pig and native MapReduce

Tags

, ,

Assuming you have installed Hadoop on your cluster, if not please follow http://code.google.com/p/hadoop-snappy/

This is the machine config of my cluster nodes, though the steps that follow could be followed with your installation/machine configs

pkommireddi@pkommireddi-wsl:/tools/hadoop/pig-0.9.1/lib$ uname -a
Linux pkommireddi-wsl 2.6.32-37-generic #81-Ubuntu SMP Fri Dec 2 20:32:42 UTC 2011 x86_64 GNU/Linux

Pig requires that the snappy jar and native be available on its classpath when a script is run.

The pig client here is installed at /tools/hadoop and the jar needs to be placed within $PIG_HOME/lib.

/tools/hadoop/pig-0.9.1/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar

Also, you need to point PIG to the snappy native

export PIG_OPTS="$PIG_OPTS -Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"

Now you have 2 ways to use map output compression in the Pig scripts:

  1. Follow instructions on http://code.google.com/p/hadoop-snappy/ to set map output compression at a cluster level
  2. Use Pig’s “set” keyword for per job level configuration
    set mapred.compress.map.output true;
    set mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;

This should get you going with using Snappy for Map output compression with Pig. You can read and write Snappy compressed files as well, though I would not recommend doing that as its not very efficient space-wise compared to other compression algorithms. There is work being done to be able to use Snappy for creating intermediate/temporary files between multiple MR jobs. You can watch the work item here https://issues.apache.org/jira/browse/PIG-2319

Using Snappy for Native Java MapReduce:

Set Configuration parameters for Map output compression

Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.set("mapred.map.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");

Set Configuration parameters for Snappy compressed intermediate Sequence Files

conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); //Block level is better than Record level, in most cases
SequenceFileOutputFormat.setCompressOutput(conf, true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");

Benefits:

  1. Map tasks begin transferring data sooner compared to Gzip or Bzip (though more data needs to be transferred to Reduce tasks)
  2. Reduce tasks run faster with better decompression speeds
  3. Snappy is not CPU intensive – which means MR tasks have more CPU for user operations

What you SHOULD use Snappy for

Map output: Snappy works great if you have large amounts of data flowing from Mappers to the Reducers (you might not see a significant difference if data volume between Map and Reduce is low)

Temporary Intermediate files (not available currently as of Pig 0.9.2, applicable only to native Map Reduce) : If you have a series of MR jobs chained together, Snappy compression is a good way to store the intermediate files. Please do make sure these intermediate files are cleaned up soon enough so we don’t have disk space issues on the cluster.

What you should NOT use Snappy for

Permanent Storage: Snappy compression is not efficient space-wise and it is expensive to store data on HDFS (3-way replication)

Plain text files: Like Gzip, Snappy is not splittable. Do not store plain text files in Snappy compressed form, instead use a container like SequenceFile.

PIG – AGGREGATE WARNINGS

With Pig 0.8 onwards there is a configuration parameter “aggregate.warning” that is by default set to true. This means when your Pig script (UDF) encounters an exception, Pig aggregates these exception messages and prints out the summary at the end of script. Something like,

11/12/09 13:35:21 WARN mapReduceLayer.MapReduceLauncher: Encountered Warning DIVIDE_BY_ZERO 14 time(s).11/12/09 13:35:21 WARN mapReduceLayer.MapReduceLauncher: Encountered Warning UDF_WARNING_3 19113867 time(s).

What happens if aggregate.warning is set to false?

Disaster! You would expect a huge number of exceptions being thrown when you are processing web logs. You can not really predict the correctness of log lines, they could be incorrectly formatted, missing data, fields…. To err is log!

What you can do is handle these exceptions in your code, catch ArrayIndexOutOfBounds, NullPointer, ClassCast etc. But web logs are huge, may be hundreds of gigs per day and generating detailed exceptions in your Hadoop logs is not a good idea. You could potentially have all your datanodes run out of disk space.

Always make sure “aggregate.warning” is set to true.

EMBEDDED PIG IN JAVA – AGGREGATE WARNINGS

Though 0.8 onwards this set true within Pig scripts, Embedding Pig in Java behaves differently. To run Pig script from within Java program, you need to first create a PigServer object.

PigServer pigServer = new PigServer(ExexType.MAPREDUCE);
pigServer.executeQuery(“A = LOAD '/logs' using PigStorage();”);

However when you look at the output logs you would find Pig generates detailed exception messages. This is not what you would expect. I have made a fix for this but its been patched only to version 0.10. http://issues.apache.org/jira/browse/PIG-2425

For version 0.9.1 (and lower) workaround for this would be:

Properties properties = PropertiesUtil.loadDefaultProperties();
properties.setProperty("aggregate.warning", "true");
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, properties);

Merge HDFS file to Local filesystem

MapReduce jobs often require more than 1 reducer when the data volumes are huge and the data processing needs to be distributed across reduce tasks/nodes. However, at the end you might need to merge these output files to a single file; for instance to be used by a data warehouse process. Let’s say the MR job used 10 reducers to generate the output. In that case, the output HDFS file would look something like this:

[prkommireddi@xyz001 ~]$ hadoop fs -ls /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30
drwxr-xr-x   - prkommireddi hdmi-technology          0 2011-08-04 11:52 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/_logs
-rw-r--r--   3 prkommireddi hdmi-technology     150417 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00000
-rw-r--r--   3 prkommireddi hdmi-technology     151142 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00001
-rw-r--r--   3 prkommireddi hdmi-technology     151521 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00002
-rw-r--r--   3 prkommireddi hdmi-technology     152603 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00003
-rw-r--r--   3 prkommireddi hdmi-technology     156200 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00004
-rw-r--r--   3 prkommireddi hdmi-technology     150540 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00005
-rw-r--r--   3 prkommireddi hdmi-technology     153042 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00006
-rw-r--r--   3 prkommireddi hdmi-technology     150738 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00007
-rw-r--r--   3 prkommireddi hdmi-technology     159762 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00008
-rw-r--r--   3 prkommireddi hdmi-technology     149084 2011-08-04 12:01 /apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30/part-r-00009
You should ignore files that precede with “_” or “.” which are ignored while being read by most InputFormats. The “_logs” for example will not be read by TextInputFormat or any regular InputFormat.
Now, coming back to the example with 10 reducers. The output from MR job generates 10 partitions part-r-00000 through part-r-00009. With the word count example each reducer will generate a partition containing unique words. That is, a word that shows up in part-r-00000 will not show up in part-r-00001 through part-r-00009. Now, you might need to merge these partitions into a single file.
You could do this on the command line with the getmerge command :
hadoop fs -getmerge apps/hdmi-technology/apd/ice/prkommireddi/SRP/2011/07/30 ${HOME}/merge.txt
With the above command, partitions 1-9 would be merged into a single local file merge.txt. Make sure “merge.txt” is not already present or the command fails.
Another way to merge an HDFS file to local FS is by using FileUtil. A static function “copyMerge” needs to be invoked, here is the definition from docs

copyMerge

public static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf,String addString) throws IOException
Copy all files in a directory to one output file (merge).
Throws:
IOException
The 1st argument needs to be an instance of distributed filesystem, whereas the 3rd argument needs to be an instance of LocalFileSytem.
FileSystem fs = FileSystem.get(conf);   //Returns the configured filesystem implementation, HDFS in this case.
FileSystem localFS = FileSystem.getLocal(conf); // Returns the local file system
Here is the wrapper function I wrote to do this:
/**
*
* @author : prkommireddi
* @date   : Aug 9, 2011
* @desc   : The method merges a HDFS directory partitions
*     to a single file on local FS.
*       deleteSrc :  boolean indicating whether source needs to be
*  deleted after merge.
* deleteDst :  boolean indicating whether destination needs to
*  be deleted if exists before merge.
* @return : boolean
*/
public static boolean mergeToLocal(FileSystem fs, Path src, FileSystem localFS, Path dst, Configuration conf, boolean deleteSrc, boolean deleteDst) {
try {
if(deleteDst)
if(localFS.exists(dst))
localFS.delete(dst);
FileUtil.copyMerge(fs, src, localFS, dst, deleteSrc, conf, “”);
returntrue;
} catch (IOException e) {
e.printStackTrace();
}
returnfalse;
}
Hope that helps.

HDFS – Data Sampler



Data Sampler

This class allows you to sample files based on a sampling percentage. This is useful while developing/testing your hadoop apps if you want your Map-Reduce job to use only a subset of entire dataset. This is NOT a “true” random sample. The goal for writing this code was to allow MR jobs to be able to run with fewer HDFS data partitions as input, helping jobs finish sooner and making debugging easier. This eliminates the need to read the entire dataset. The idea is not to generate a truly random sample, but to read ONLY a few partitions.


package hadoop.util.data;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class DataSampler {
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc This function returns Path array based on the sampling percentage
* desired. For eg, for 450-way HDFS file if a 10% sample is required,
* this function returns a subset containing 45 paths.
* @return Path[]
*
*/
public static Path[] getSamplePaths(FileSystem fs, Path path,
int samplePercentage) throws IOException {
Path[] paths = listPaths(fs, path);
return getSamplePathsHelper(paths, samplePercentage);
}
public static Path[] getSamplePaths(String uri, int samplePercentage)
throws IOException {
Path[] paths = listPaths(uri);
return getSamplePathsHelper(paths, samplePercentage);
}
private static Path[] getSamplePathsHelper(Path[] paths,
int samplePercentage) {
List pathList = filteredPaths(paths);
final int TOTAL_SOJ_PATHS = pathList.size();
final int sampleNumPaths = (TOTAL_SOJ_PATHS * samplePercentage) / 100;
final int divisionFactor = 100 / samplePercentage;
Path[] samplePaths = new Path[sampleNumPaths];
int j = 0;
for (int i = 0; i < TOTAL_SOJ_PATHS; i++) {
if (j >= sampleNumPaths) {
break;
}
if (i % divisionFactor == 0) {
samplePaths[j++] = pathList.get(i);
}
}
return samplePaths;
}
/**
*
* @author : Prashant Kommireddi
* @date : Apr 25, 2011
* @return : boolean
* @desc : This function returns false if the HDFS filename specified by
* argument Path p, starts with a "_" or "." This is useful to skip
* the files that must be ignored/NOT read.
*/
public static boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
public static List filteredPaths(Path[] paths) {
List filteredList = new ArrayList();
for (int i = 0; i < paths.length; i++) {
if (accept(paths[i]))
filteredList.add(paths[i]);
}
return filteredList;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Returns an array of Path(s) corresponding to an input URI.
* @return Path[]
*
*/
public static Path[] listPaths(String uRi) throws IOException {
String uri = uRi;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uRi);
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
/**
*
* @author Prashant Kommireddi
* @date Mar 9, 2011
* @desc Function returns an array of Path(s) contained in the given HDFS
* path
* @return Path[]
*
*/
public static Path[] listPaths(FileSystem fs, Path path) throws IOException {
FileStatus[] status = fs.listStatus(path);
Path[] listedPaths = FileUtil.stat2Paths(status);
return listedPaths;
}
}



Making a call to DataSampler

Calling getSamplePaths(FileSystem fs, Path path, int samplePercentage)
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));

The function getSamplePaths takes 3 arguments:
  1. FileSystem object
  2. Input Path - this must be a HDFS location
  3. Sample percentage - eg 10%, 1%, 20%
Based on the above arguments, the function returns an absolute number of paths which can be passed as input to the MR job using :
FileInputFormat.setInputPaths(job, DataSampler.getSamplePaths(fs, inputPath, samplePercentage));
Of course, you could use regex expressions (GlobFilter etc) to pick a subset of files too. I wrote this class to make it easier to return the subset and not worry about the internal structure/naming of HDFS file.

Hadoop java.lang.OutOfMemoryError: Java heap space

Ever run into Out of memory error with your MR tasks? This could happen sometimes if you are using large lookups (DistributedCache) or data structures holding huge amount of Entries, and the heap allocated to MapReduce tasks is not sufficient to handle this.

Stepping back, each map and reduce task is launched in separate JVM’. The heap allocated here is based on the configuration parameter “mapred.child.java.opts“. If this is too small to handle your data, the hadoop job will complain it does not have sufficient heap to continue. Try increasing the heap size through Configuration, but make sure you keep in mind the amount of RAM/number of cores/number of MR tasks configured on the nodes while adjusting the parameter.

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx2048m</value>
</property>

Here I have increased the heap size to 2 GB. Of course I had huge RAM on the nodes in the cluster to be able to do so. Adding this as a resource to Configuration is one way, another is to simply inject it into Configuration invoking the set method.

set

public void set(String name, String value)

Set the value of the name property.

Parameters:
name – property name.
value – property value.
Follow

Get every new post delivered to your Inbox.