Tuesday, May 27, 2014

Pig

11: Pig

- Raises level of abstraction for processing large datasets.
- With Pig, data structures are much richer, and the set of transformations you can apply to data are much more powerful - they include joins.

Pig is made of two peices:

Language used to express data flows called Pig Latin
Execution environment to run Pig Latin programs. There are currently two environments: local execution in single JVM and distributed execution on Hadoop cluster.

- Pig latin is made of series of operations, or transformations, applied to input data to produce output.
- Operations describe the data flow, which Pig execution environment translates into executable representation and then runs.
- Under the covers, Pig turns the transformations into series of MapReduce jobs.
- Pig is scripting language for exploring large datasets.
- Pigs sweet spot is its ability to process terabytes of data simply by issuing half dozen lines of Pig Latin from console.
- It was created at Yahoo to mine huge data sets there.
- Pig was designed to be extensible:
All parts of processing path are customizable:
Loading
Storing
Filtering
Grouping
and Joining can all be altered by user defined functions (UDFs).
- UDFs tend to be more reusable than libraries developed for writing MapReduce programs.
- It is designed for batch processing of data.
- If we want to perform query that touches only a small amount of data in large dataset, then Pig wont perform well.
- It is set up to scan whole dataset, or at least large portions of it.

Installing and Running Pig:

- Pig runs as client side application.
- Even if you want to run Pig on Hadoop cluster, there is nothing extra to install on cluster.
- Pig launches jobs and interacts with HDFS from your workstation.
- Download stable release from  http://pig.apache.org/releases.html,
- unpack the tarball in suitable place on your workstation:
% tar xzf pig-x.y.z.tar.gz

It’s convenient to add Pig’s binary directory to your command-line path. For example:
% export PIG_INSTALL=/home/tom/pig-x.y.z
% export PATH=$PATH:$PIG_INSTALL/bin

You also need to set the JAVA_HOME environment variable to point to a suitable Java
installation.
Try typing pig -help to get usage instructions.

Execution types:

- Pig has two execution types or modes: local mode and MapReduce mode.

Local mode:
- Pig runs in single JVM and accesses local filesystem.
- This mode is only for small datasets and when trying out Pig.
- Execution type is set using -x or -exectype option. To run in local mode, set option to local:
% pig -x local
grunt>

This starts Grunt, the Pig interactive shell, which is discussed in more detail shortly.

MapReduce mode:

- In MapReduce mode, Pig translates queries into MapReduce jobs and runs them on a Hadoop cluster.
- Cluster may be pesudo- or fully distributed cluster.
- MapReduce mode is what you use when you want to run Pig on large datasets.

- To run MapReduce mode, check that version of Pig you downloaded is compatible with Hadoop version.
- Pig releases only work against particular versions of Hadoop.
- Pig honors the HADOOP_HOME environment variable for finding which Hadoop client to
run. However if it is not set, Pig will use a bundled copy of the Hadoop libraries. Note
that these may not match the version of Hadoop running on your cluster, so it is best
to explicitly set HADOOP_HOME.
- Next, you need to point Pig at the cluster’s namenode and jobtracker. If the installation
of Hadoop at HADOOP_HOME is already configured for this, then there is nothing more to
do. Otherwise, you can set HADOOP_CONF_DIR to a directory containing the Hadoop site
file (or files) that define fs.default.name and mapred.job.tracker.

Alternatively, you can set these two properties in the pig.properties file in Pig’s conf
directory (or the directory specified by PIG_CONF_DIR). Here’s an example for a pseudodistributed
setup:
fs.default.name=hdfs://localhost/
mapred.job.tracker=localhost:8021

- Once you have configured Pig to connect to Hadoop cluster, you can launch Pig setting the -x option to mapreduce, or ommitting it entirely, as MapReduce mode is the default:

% pig
2012-01-18 20:23:05,764 [main] INFO  org.apache.pig.Main - Logging error message
s to: /private/tmp/pig_1326946985762.log
2012-01-18 20:23:06,009 [main] INFO  org.apache.pig.backend.hadoop.executionengi
ne.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost/
2012-01-18 20:23:06,274 [main] INFO  org.apache.pig.backend.hadoop.executionengi
ne.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:8021
grunt> 

As you can see from the output, Pig reports the filesystem and jobtracker that it has
connected to.

Running Pig Programs:

- Three ways of executing Pig programs, all of which work in both local and MapReduce mode:

Script:
- Pig can run a script file that contains Pig commands.
-  For example, pig script.pig runs the commands in the local file script.pig. Alternatively, for very
short scripts, you can use the -e option to run a script specified as a string on the
command line.
Grunt:
- Interactive shell for running Pig commands.
- Grunt is started when no file is specified for Pig to run, and the -e option is not used.
- It is also possible to run Pig scripts from within Grunt using run and exec.

Embedded:
- You can run Pig programs from Java using PigServer class, much like you can use JDBC to run SQL programs from Java.
- For programmatic access to Grunt, use PigRunner.

Grunt:
- Grunt remembers command history and you can recall lines in the history buffer using Ctrl-P or Ctrl-N.

 For example, consider the following incomplete line:
grunt> a = foreach b ge
If you press the Tab key at this point, ge will expand to generate, a Pig Latin keyword:
grunt> a = foreach b generate

Pig Latin Editors:

- PigPen is an Eclipse plug-in that provides an environment for developing Pig programs.
- It includes Pig script text editor, and example generator and button for running the script on Hadoop cluster.

Example
- Calculate maximum recorded temperature by year for weather dataset in Pig Latin:
-- max_temp.pig: Finds the maximum temperature by year
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
  MAX(filtered_records.temperature);
DUMP max_temp;

- Result of load operator is a relation, which is just a set of tuples.-
- Tuple is just like a row of data in database table, with multiple fields in particular order.
- In this ex, LOAD function produces set of tuples that are present in input file.
- We write relation with one tuple per line, where tuples are represented as comma-separated items in parenthesis:
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)

- Relations are given names, or aliases so they can be referred to.
- This relation is given records alias.
- We can examine the contents of an alias using the DUMP operator.

grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

- We can also see the structure of the relation- relations schema- using the DESCRIBE operator on relations alias.
grunt> DESCRIBE records;
records: {year: chararray,temperature: int,quality: int}

- Second statement removes records that have missing temperature or an unsatisfactory quality reading.
- For this small dataset, no records are filtered out:

grunt> filtered_records = FILTER records BY temperature != 9999 AND
>>   (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grunt> DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

- Third statement uses the GROUP function to group records relation by the year field.
Let’s use DUMP to see what it produces:
grunt> grouped_records = GROUP filtered_records BY year;
grunt> DUMP grouped_records;
(1949,{(1949,111,1),(1949,78,1)})
(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})

- We now have two rows or tuples, one for each year in input data/
- First field in each tuple is field being grouped by year, second field is bag of tuples for that year.
- Bag is just unordered collection of tuples, which in Pig latin is represented using curly braces.

By grouping data in this way, we have created row per year, now all that remains is to find maximum temperature for tuples in each bag.
grunt> DESCRIBE grouped_records;
grouped_records: {group: chararray,filtered_records: {year: chararray,
temperature: int,quality: int}}
- This tells us that grouping field is given alias group by Pig, and second field is same structure as filtered_records relation that was being grouped.
- With this information, we can try fourth transformation.
grunt> max_temp = FOREACH grouped_records GENERATE group,
>>   MAX(filtered_records.temperature);

FOREACH processes every row to generate a derived set of rows, using a GENERATE
clause to define the fields in each derived row. In this example, the first field is
group, which is just the year. The second field is a little more complex.
The filtered_records.temperature reference is to the temperature field of the
filtered_records bag in the grouped_records relation. MAX is a built-in function for
calculating the maximum value of fields in a bag. In this case, it calculates the maximum
temperature for the fields in each filtered_records bag. Let’s check the result:
grunt> DUMP max_temp;
(1949,111)
(1950,22)

Generating Examples:

- With the ILLUSTRATE operator, Pig provides tool for generating reasonably complete and concise dataset.
- Here is the output from running ILLUSTRATE

grunt> ILLUSTRATE max_temp;
------------------------------------------------------------------------------|
records     | year:chararray      | temperature:int      | quality:int      |
------------------------------------------------------------------------------|
            | 1949                | 78                   | 1                |
|
            | 1949                | 111                  | 1                |
|
            | 1949                | 9999                 | 1                |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
filtered_records     | year:chararray      | temperature:int      | quality:int      |
---------------------------------------------------------------------------------------|
                     | 1949                | 78                   | 1                |
|
                     | 1949                | 111                  | 1                |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
grouped_records     | group:chararray      | filtered_records:bag{:tuple(year:chararray, |

                                                            temperature:int,quality:int)} |
-------------------------------------------------------------------------------------------|
                    | 1949                 | {(1949, 78, 1), (1949, 111, 1)}             |
---------------------------------------------------------------------------------------------------------------------------------------------|
max_temp     | group:chararray      | :int      |
--------------------------------------------------|
             | 1949                 | 111       |
---------------------------------------------------

- Notice that Pig used some of the original data as well as creating some new data.
- It noticed the special value 9999 in the query and created tuple containing this value to exercise FILTER statement.
- Output of ILLUSTRATE is easy to follow and can help you understand what your query is doing.

Comparison with Databases:

- Pig latin is data flow programming language, whereas SQL is the declarative programming language.
- Pig Latin program is step by step set of operations on an input relation, in which each step is single transformation.
- In many ways, programming in Pig Latin is like working at level of RDBMS query planner, which figures out how to turn a declarative statement into system of steps.
- Pig is more relaxed about the data that it processes, you can define schema at runtime, but its optional.
- Pig does not support random reads or queries in the order of tens of milliseconds. 
- Nor does it support random writes to update small portions of data, all writes are bulk, streaming writes just like MapReduce.

- Hive sits between Pig and conventional RDBMS.
- Hive is designed to use HDFS for storage.
- Its query language, HiveQL, is based on SQL.

Pig Latin:

Structure:

- Pig latin program consists of collection of statements.
- A statement can be thought of as an operation or a command.
- For ex, GROUP operation is a type of statement:
grouped_records = GROUP records BY year;

The command to list the files in a Hadoop filesystem is another example of a statement:
ls /

- Statements or commands for interactive use in Grunt do not need terminating semicolon.
- Its never an error to add terminating semicolon.

- Statements that have to be terminated with semicolon can be split across multiple lines for readability:
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);

- Pig has two forms of comments:
Double hyphens are single line comments
- Everything from first hyphen to end of the line is ignored by Pig Latin interpreter:

-- My program
DUMP A; -- What's in A?

- Pig latin has mixed rules for case sensitivity.
- Operators and commands are not case sensitive, however aliases and function names are case sensitive.

Statements:

- Consider again the Pig Latin program from first example:
-- max_temp.pig: Finds the maximum temperature by year
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
  MAX(filtered_records.temperature);
DUMP max_temp;

- When Pig Latin interpreter sees first line containing LOAD statement, it confirms that it is syntactically and semantically correct and adds it to logical plan but it does not load data from the file.
- Pig validates the GROUP and FOREACH..GENERATE statements, and adds them to logical plan without executing them.
- Trigger for Pig to start execution is DUMP statement.
- At that point, logical plan is compiled into physical plan and executed.

- You can see logical and physical plans created by Pig using the EXPLAIN command on relation(EXPLAIN max_temp, for ex).
EXPLAIN will also show MapReduce plan, which shows how physical operators are grouped into MapReduce jobs. This is good way of finding how many MapReduce jobs Pig will run for the query.

- DUMP is sort of diagnostic operator, since it is used only to allow interactive debugging of small result sets or in combination with LIMIT to retrieve few rows from larger relation.
- STORE statement should be used when size of output is more than few lines, as it writes to file, rather than to console.

- The debug option is used to turn debug logging on or off from within the script
grunt> set debug on

- There are two commands for running Pig script, exec and run.
- Difference is that exec runs the script in batch mode in new Grunt shell, so any aliases defined in script are not accessible to shell after script has completed.
- Running script with run, it is as if the contents of script has been entered manually, so command history of invoking shell contains all statements from script.
- Multiquery execution where Pig executes batch of statements in one go is only used by exec, not run.

Expressions:

- An expression is something that is evaluated to yield a value.
- Expression can be used in Pig as part of statement containing relational operator.
- Pig has rich variety of expressions.

Types:

- All of pig latin types are listed below:

Pig provides built in functions: TOTUPLE, TOBAG and TOMAP which are used for turning expressions into tuples, bags and maps.

- Although relations and bags are conceptually the same, in practice Pig treats them slightly differently.
- Relation is top level construct, whereas a bag has to be contained in a relation.
- It is not possible to create relation from bag literal.
 So the following statement fails:
A = {(1,2),(3,4)}; -- Error
The simplest workaround in this case is to load the data from a file using the LOAD
statement.
As another example, you can’t treat a relation like a bag and project a field into a new
relation ($0 refers to the first field of A, using the positional notation):
B = A.$0;
Instead, you have to use a relational operator to turn the relation A into relation B:
B = FOREACH A GENERATE $0;

Schemas:

- Relation in Pig may have an associated schema, which gives the fields in relation names and types.
grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
>>   AS (year:int, temperature:int, quality:int);
grunt> DESCRIBE records;
records: {year: int,temperature: int,quality: int}

It’s possible to omit type declarations completely, too:
grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
>>   AS (year, temperature, quality);
grunt> DESCRIBE records;
records: {year: bytearray,temperature: bytearray,quality: bytearray}

- In this case, we have specified only the names of fields in the schema, year, temperature and quality.
- Types default to bytearray, most general type, representing a binary string.
- Schema is completely optional and can be ommitted by not specifying an AS clause:

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt';
grunt> DESCRIBE records;
Schema for records unknown.

Fields in relation with no schema can be referenced only using positional notation:
$0 refers to first field in relation,
$1 to the second and so on.
Their types default to bytearray:

grunt> projected_records = FOREACH records GENERATE $0, $1, $2;
grunt> DUMP projected_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
grunt> DESCRIBE projected_records;
projected_records: {bytearray,bytearray,bytearray}

Validation and Nulls:

- In Pig, if the value cannot be cast to type declared in the schema, then it will substitue a null value.
- Lets see how this works if we have following input for weather data, which has an e character in place of integer:

1950    0   1
1950    22  1
1950    e   1
1949    111 1
1949    78  1

- Pig handles the corrupt line by producing a null for offending value, which is displayed as absence of value when dumped to screen.
grunt> records = LOAD 'input/ncdc/micro-tab/sample_corrupt.txt'
>>   AS (year:chararray, temperature:int, quality:int);
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,,1)
(1949,111,1)
(1949,78,1)

- Pig produces a warning for the invalid field, but does not halt its processing.
- For large datasets, it is very common to have corrupt, invalid or merely unexpected data, and it is generally infeasible to incrementally fix every unparsable record.
- Instead we can pull out all of the invalid records in one go, so we can take action on them, perhaps by fixing our program or by filtering them out.

grunt> corrupt_records = FILTER records BY temperature is null;
grunt> DUMP corrupt_records;
(1950,,1)

We can find number of corrupt records using following idiom for counting number of rows in relation:

grunt> grouped = GROUP corrupt_records ALL;
grunt> all_grouped = FOREACH grouped GENERATE group, COUNT(corrupt_records);
grunt> DUMP all_grouped;
(all,1)

- Another useful technique is to use SPLIT operator to partition data into good and bad relations, which can then be analyzed separately:

grunt> SPLIT records INTO good_records IF temperature is not null,
>>   bad_records IF temperature is null;
grunt> DUMP good_records;
(1950,0,1)
(1950,22,1)
(1949,111,1)
(1949,78,1)
grunt> DUMP bad_records;
(1950,,1)

- Corrupt record cannot be easily detected, since it does not surface as null:

grunt> records = LOAD 'input/ncdc/micro-tab/sample_corrupt.txt'
>>   AS (year:chararray, temperature, quality:int);
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,e,1)
(1949,111,1)
(1949,78,1)
grunt> filtered_records = FILTER records BY temperature != 9999 AND
>>   (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grunt> grouped_records = GROUP filtered_records BY year;
grunt> max_temp = FOREACH grouped_records GENERATE group,
>>   MAX(filtered_records.temperature);
grunt> DUMP max_temp;
(1949,111.0)
(1950,22.0)

- Best approach is generally to declare types for your data on loading, and look for missing or corrupt values in relations themselves before you do your main processing.
- Sometimes corrupt data shows up as smaller tuples since fields are simply missing.
You can filter these out by using the SIZE function as follows:

grunt> A = LOAD 'input/pig/corrupt/missing_fields';
grunt> DUMP A;
(2,Tie)
(4,Coat)
(3)
(1,Scarf)
grunt> B = FILTER A BY SIZE(TOTUPLE(*)) > 1;
grunt> DUMP B;
(2,Tie)
(4,Coat)
(1,Scarf)

Schema Merging:

- In Pig, you dont declare schema for every new relation in data flow.
- In most cases, Pig can figure out the resulting schema for output of relational operation by considering schema of input relation.
- Some relational operators dont change schema, so relation produced by the LIMIT operator has same schema as the relation it operates on.
- You can find out the schema for any relation in data flow using DESCRIBE operator.
- If you want to redefine the schema for relation, you can use FOREACH...GENERATE operator with AS clause to define the schema for some or all of fields of input relation.

Functions:

- Functions in Pig come in four types:

Eval function:

- this function takes one or more expression and returns another expression
- built-in eval function is MAX, which returns maximum value of entries in bag.
- MAX is example of aggregate function.

Filter function:

- Special type of eval function that returns logical boolean result.
- Filter functions are used in FILTER operator to remove unwanted rows.
- Ex of built in filter function is IsEmpty, which tests whether a bag or map contains any items.

Load function:

- Function specifies how to load data into relation from external storage.

Store function:

- Function that specifies how to save contents of relation to external storage.
- Load and store functions are implemented by same type.
- PigStorage, which loads data from delimited text files, can store data in same format.

- If function you need is not available, you can write your own.
- Have a look at piggy bank, repository for Pig functions shared by Pig community.
- You can write your own function known as user defined functions or UDFs.

Macros:

- Provide a way to package reusable pieces of Pig Latin code from within Pig latin itself.
- For ex, we can extract part of our Pig latin program that performs grouping on relation then find maximum value in each group, by defining a macro as follows:

DEFINE max_by_group(X, group_key, max_field) RETURNS Y {
  A = GROUP $X by $group_key;
  $Y = FOREACH A GENERATE group, MAX($X.$max_field);
};

- Macro called max_by_group takes three parameters: relation X, and two field names, group_key and max_field.
- It returns a single value Y.
- Within the macro body, parameters and returns aliases are referenced with $ prefix, such as $X.

The macro is used as follows:
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
max_temp = max_by_group(filtered_records, year, temperature);
DUMP max_temp

At runtime, Pig will expand the macro using the macro definition. After expansion, the
program looks like the following, with the expanded section in bold.
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
macro_max_by_group_A_0 = GROUP filtered_records by (year);
max_temp = FOREACH macro_max_by_group_A_0 GENERATE group,
  MAX(filtered_records.(temperature));
DUMP max_temp

- You dont normally see expanded form since Pig creates it internally, however in some cases it is useful to see it when writing and debugging macros.
- You can get pig to perform macro expansion only by passing the -dryrun argument to pig.

- To foster reuse, macros can be defined in separate files to Pig scripts, in which case they need to be imported into any script that uses them.
- Import statement looks like this:

IMPORT './ch11/src/main/pig/max_temp.macro';

User-Defined Functions:

Filter UDF:

The idea is to change this line:

filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
to:
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

- It makes Pig script more concise, and it encapsulates logic in one place so that it can be easily reused in other scripts.
- Filter UDFs are all subclasses of FilterFunc, which itself is subclass of EvalFunc. 
- We will look at EvalFunc in more detail later, but for moment just note that EvalFunc looks like following class:

public abstract class EvalFunc<T> {
  public abstract T exec(Tuple input) throws IOException;
}


Example: A FilterFunc UDF to remove records with unsatisfactory temperature quality readings
package com.hadoopbook.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
public class IsGoodQuality extends FilterFunc {
  @Override
  public Boolean exec(Tuple tuple) throws IOException {
    if (tuple == null || tuple.size() == 0) {
      return false;
    }
    try {
      Object object = tuple.get(0);
      if (object == null) {
        return false;
      }
      int i = (Integer) object;
      return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
    } catch (ExecException e) {
      throw new IOException(e);
    }
  }

}

- To use a new function, we first compile it and package it in a JAR file.
- Then we tell Pig about the JAR file with REGISTER operator, which is given local path to filename.
grunt> REGISTER pig-examples.jar;

Finally we invoke the function:

grunt> filtered_records = FILTER records BY temperature != 9999 AND
>>   com.hadoopbook.pig.IsGoodQuality(quality);

- Pig resolves function call by treating functions name as Java classname and attempting to load class of that name.
- When searching for classes, Pig uses a classloader that includes JAR files that have been registered.
- When running in distributed mode, Pig will ensure that your JAR files get shipped to cluster.

- For UDF in this example, Pig looks for class with name com.hadoop.book.pig.IsGoodQuality, which it finds in the JAR file we registered.
- Built in function MAX is actually implemented by class MAX in package org.apache.pig.builtin.

- We can add our package name to search path by invoking Grunt with this command-line argument: 
-Dudf.import.list=com.hadoopbook.pig
- We can shorten function name by defining alias, using DEFINE operator:

grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

Here’s the final program using the new function:
-- max_temp_filter_udf.pig
REGISTER pig-examples.jar;
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
  MAX(filtered_records.temperature);
DUMP max_temp;

An Eval UDF:

Example: An EvalFunc UDF to trim leading and trailing whitespace from chararray values
public class Trim extends EvalFunc<String> {
  @Override
  public String exec(Tuple input) throws IOException {
    if (input == null || input.size() == 0) {
      return null;
    }
    try {
      Object object = input.get(0);
      if (object == null) {
        return null;
      }
      return ((String) object).trim();
    } catch (ExecException e) {
      throw new IOException(e);
    }
  }
  @Override
  public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
    List<FuncSpec> funcList = new ArrayList<FuncSpec>();
    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
        new Schema.FieldSchema(null, DataType.CHARARRAY))));
    return funcList;
  }
}

- Trim UDF returns a string, which Pig translates as chararray, as can be seen from following session:

grunt> DUMP A;
( pomegranate)
(banana  )
(apple)
(  lychee )
grunt> DESCRIBE A;
A: {fruit: chararray}
grunt> B = FOREACH A GENERATE com.hadoopbook.pig.Trim(fruit);
grunt> DUMP B;
(pomegranate)
(banana)
(apple)
(lychee)
grunt> DESCRIBE B;
B: {chararray}

- A has chararray fields that have leading and trailing spaces.
- We create B from A by applying trim function to first field in A.
- Bs fields are correctly inferred to be of type chararray.

Dynamic Invokers:

- Sometimes you want to use function that is provided by Java library but without going to effort of writing UDF.
- Dynamic invokers allow you to do this by calling Java methods directly from Pig script.
- For scripts that are run repeatedly, dedicated UDF is normally preferred.
- Following snippet shows how we could define and use a trim UDF that uses Apache Commons Lang StringUtils class.

grunt> DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim', 'String');
grunt> B = FOREACH A GENERATE trim(fruit);
grunt> DUMP B;
(pomegranate)
(banana)
(apple)
(lychee)

- InvokeForString invoker is used since return type of method is String.
- First argument to invoker constructor is fully qualified method to be invoked.
- Second is space separated list of method argument classes.

Load UDF:

- We will demonstrate custom load function that can read plain text column ranges as fields, very much like Unix cut command. It is used as follows:

grunt> records = LOAD 'input/ncdc/micro/sample.txt'
>>   USING com.hadoopbook.pig.CutLoadFunc('16-19,88-92,93-93')
>>   AS (year:int, temperature:int, quality:int);
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

Example: A LoadFunc UDF to load tuple fields as column ranges
public class CutLoadFunc extends LoadFunc {
  private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);
  private final List<Range> ranges;
  private final TupleFactory tupleFactory = TupleFactory.getInstance();
  private RecordReader reader;
  public CutLoadFunc(String cutPattern) {
    ranges = Range.parse(cutPattern);
  }
  @Override
  public void setLocation(String location, Job job)
      throws IOException {
    FileInputFormat.setInputPaths(job, location);
  }
  @Override
  public InputFormat getInputFormat() {

    return new TextInputFormat();
  }
  @Override
  public void prepareToRead(RecordReader reader, PigSplit split) {
    this.reader = reader;
  }
  @Override
  public Tuple getNext() throws IOException {
    try {
      if (!reader.nextKeyValue()) {
        return null;
      }
      Text value = (Text) reader.getCurrentValue();
      String line = value.toString();
      Tuple tuple = tupleFactory.newTuple(ranges.size());
      for (int i = 0; i < ranges.size(); i++) {
        Range range = ranges.get(i);
        if (range.getEnd() > line.length()) {
          LOG.warn(String.format(
              "Range end (%s) is longer than line length (%s)",
              range.getEnd(), line.length()));
          continue;
        }
        tuple.set(i, new DataByteArray(range.getSubstring(line)));
      }
      return tuple;
    } catch (InterruptedException e) {
      throw new ExecException(e);
    }
  }
}

- In Pig, data loading takes place before mapper runs, so it is important that input can be split into portions that are independently handled by each mapper.
- Pig runtime calls getNext() repeatedly, and load function reads tuples from reader until the reader reaches last record in its split.
- At this point, it returns null to signal that there are no more tuples to be read.
- It is responsibility of getNext() implementation to turn lines of input file into Tuple objects. It does this by means of TupleFactory(Pig class for creating Tuples instances.

Data Processing Operators:

Loading and Storing Data:

- We have seen how to load data from external storage for processing in Pig.
- Storing the results is straightforward, too.
- Here is an example of using PigStorage to store tuples as plain text values separated by colon character.

grunt> STORE A INTO 'out' USING PigStorage(':');
grunt> cat out
Joe:cherry:2
Ali:apple:3
Joe:banana:2
Eve:apple:7

Filtering Data:

- Once you have data loaded into relation, next step is to filter it to remove data that you are not interested in.
- By filtering early in processing pipeline, you minimize amount of data flowing through the system, which can improve efficiency.

FOREACH...GENERATE:

- Used to act on every row in a relation.
- It can be used to remove fields or to generate new ones.
In this example, we do both:
grunt> DUMP A;
(Joe,cherry,2)
(Ali,apple,3)
(Joe,banana,2)
(Eve,apple,7)
grunt> B = FOREACH A GENERATE $0, $2+1, 'Constant';
grunt> DUMP B;
(Joe,3,Constant)
(Ali,4,Constant)
(Joe,3,Constant)
(Eve,8,Constant)

FOREACH..GENERATE operator has nested form to support more complex processing.
- In the following example, we compute various statistics for weather dataset:

-- year_stats.pig
REGISTER pig-examples.jar;
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
records = LOAD 'input/ncdc/all/19{1,2,3,4,5}0*'
  USING com.hadoopbook.pig.CutLoadFunc('5-10,11-15,16-19,88-92,93-93')
  AS (usaf:chararray, wban:chararray, year:int, temperature:int, quality:int);
grouped_records = GROUP records BY year PARALLEL 30;
year_stats = FOREACH grouped_records {
  uniq_stations = DISTINCT records.usaf;
  good_records = FILTER records BY isGood(quality);
  GENERATE FLATTEN(group), COUNT(uniq_stations) AS station_count,
    COUNT(good_records) AS good_record_count, COUNT(records) AS record_count;
}
DUMP year_stats;

STREAM:

- allows you to transform data in relation using an external program or script.
- STREAM can use built in commands with arguments.
- Note that the command and its arguments are enclosed in backticks:

grunt> C = STREAM A THROUGH `cut -f 2`;
grunt> DUMP C;
(cherry)
(apple)
(banana)
(apple)

- STREAM operator uses PigStorage to serialize and deserialize relations to and from programs standard input and output streams.
- Tuples in A are converted to tab delimited lines that are passed to script.
- Ouptut of the script is read one line at a time and split on tabs to create new tuples for output relation C.
- Pig streaming is most powerful when you write custom processing scripts.

Grouping and Joining Data:

- Joining datasets in MapReduce takes some work on part of programmer whereas Pig has very good built in support for join operations, making it much more approachable.
- Joins are used more infrequently in Pig than they are in SQL.

JOIN:

- Inner join example:
grunt> DUMP A;
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)
grunt> DUMP B;
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)
We can join the two relations on the numerical (identity) field in each:
grunt> C = JOIN A BY $0, B BY $1;
grunt> DUMP C;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)

- You should use general join operator if all relations being joined are too large to fit in the memory.
- If one of the relation is small enough to fit in memory, there is special type of join called fragment replicate join, which is implemented by distributing small input to all mappers and performing map side join using an in memory lookup table against larger relation.

 There is a special syntax for telling
Pig to use a fragment replicate join:

grunt> C = JOIN A BY $0, B BY $1 USING "replicated";
The first relation must be the large one, followed by one or more small ones (all of
which must fit in memory).

- Pig also supports outer joins using syntax that is similar to SQLs Hive

For example:
grunt> C = JOIN A BY $0 LEFT OUTER, B BY $1;
grunt> DUMP C;
(1,Scarf,,)
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)

COGROUP:

- JOIN always gives a flat structure: set of tuples.
- COGROUP statement is similar to JOIN but creates a nested set of output tuples.
- This can be useful if you want to exploit the structure in subsequent statements:

grunt> D = COGROUP A BY $0, B BY $1;
grunt> DUMP D;
(0,{},{(Ali,0)})
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})

COGROUP generates a tuple for each unique grouping key.
- First field of each tuple is a key and remaining fields are bags of tuples from relations with matching key.
- First bag contains matching tuples from relation A with same key.
- Second bag contains matching tuple from relation B with same key.

- You can suppress rows with empty bags by using the INNER keyword, which gives the COGROUP inner join semantics.
- INNER keyword is applied per relation, so following only suppresses rows when relation A has no match.

grunt> E = COGROUP A BY $0 INNER, B BY $1;
grunt> DUMP E;
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})
We can flatten this structure to discover who bought each of the items in relation A:
grunt> F = FOREACH E GENERATE FLATTEN(A), B.$0;
grunt> DUMP F;
(1,Scarf,{})
(2,Tie,{(Joe),(Hank)})
(3,Hat,{(Eve)})
(4,Coat,{(Hank)})
Using a combination of COGROUP, INNER, and FLATTEN (which removes nesting)
it’s possible to simulate an (inner) JOIN:
grunt> G = COGROUP A BY $0 INNER, B BY $1 INNER;
grunt> H = FOREACH G GENERATE FLATTEN($1), FLATTEN($2);
grunt> DUMP H;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
This gives the same result as JOIN A BY $0, B BY $1.

- If the join key is composed of several fields, you can specify them all in the BY clause of the JOIN or COGROUP statement.
- Make sure that number of fields in each BY clause is same.

-- max_temp_station_name.pig
REGISTER pig-examples.jar;
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
stations = LOAD 'input/ncdc/metadata/stations-fixed-width.txt'
  USING com.hadoopbook.pig.CutLoadFunc('1-6,8-12,14-42')
  AS (usaf:chararray, wban:chararray, name:chararray);
trimmed_stations = FOREACH stations GENERATE usaf, wban,
  com.hadoopbook.pig.Trim(name);    
records = LOAD 'input/ncdc/all/191*'
  USING com.hadoopbook.pig.CutLoadFunc('5-10,11-15,88-92,93-93')
  AS (usaf:chararray, wban:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
grouped_records = GROUP filtered_records BY (usaf, wban) PARALLEL 30;
max_temp = FOREACH grouped_records GENERATE FLATTEN(group),
  MAX(filtered_records.temperature);
max_temp_named = JOIN max_temp BY (usaf, wban), trimmed_stations BY (usaf, wban)
  PARALLEL 30;
max_temp_result = FOREACH max_temp_named GENERATE $0, $1, $5, $2;
STORE max_temp_result INTO 'max_temp_by_station';

CROSS:

- Pig latin includes the cross product operator.
- Size of the output is the product of size of inputs, potentially making output very large:

grunt> I = CROSS A, B;
grunt> DUMP I;
(2,Tie,Joe,2)
(2,Tie,Hank,4)
(2,Tie,Ali,0)
(2,Tie,Eve,3)
(2,Tie,Hank,2)
(4,Coat,Joe,2)
(4,Coat,Hank,4)
(4,Coat,Ali,0)
(4,Coat,Eve,3)
(4,Coat,Hank,2)
(3,Hat,Joe,2)
(3,Hat,Hank,4)
(3,Hat,Ali,0)
(3,Hat,Eve,3)
(3,Hat,Hank,2)
(1,Scarf,Joe,2)
(1,Scarf,Hank,4)
(1,Scarf,Ali,0)
(1,Scarf,Eve,3)
(1,Scarf,Hank,2)

Computing cross product of whole input dataset is rarely needed if ever.

GROUP:

- Groups data in single relation
- Supports grouping by more than equality of keys, you can use expression or UDF as group key.
- Consider following relation A:
- grunt> DUMP A;
(Joe,cherry)
(Ali,apple)
(Joe,banana)
(Eve,apple)
Let’s group by the number of characters in the second field:
grunt> B = GROUP A BY SIZE($1);
grunt> DUMP B;
(5,{(Ali,apple),(Eve,apple)})
(6,{(Joe,cherry),(Joe,banana)})
- There are also two special grouping operations: ALL and ANY.
- ALL groups all tuples in the relation in single group, as if the GROUP function was constant.
grunt> C = GROUP A ALL;
grunt> DUMP C;
(all,{(Joe,cherry),(Ali,apple),(Joe,banana),(Eve,apple)})

- ALL grouping is commonly used to count number of tuples in a relation
- ANY keyword is used to group the tuples in relation randomly, which can be useful for sampling.

Sorting Data:

- Relations are unordered in Pig. Consider a relation A:
grunt> DUMP A;
(2,3)
(1,2)
(2,4)

- There is no guarantee which order the rows will be processed in.
- In particular, when retreiving contents of A using DUMP or STORE, rows may be written in any order.
- If you want to impose order on output, you can use ORDER operator to sort relation by one or more fields.
- Default sort order compares fields of same type using natural ordering and different types are given an arbitrary, but deterministic, ordering.

The following example sorts A by the first field in ascending order and by the second
field in descending order:
grunt> B = ORDER A BY $0, $1 DESC;
grunt> DUMP B;
(1,2)
(2,4)
(2,3)

Any further processing on a sorted relation is not guaranteed to retain its order. For
example:
grunt> C = FOREACH B GENERATE *;

- LIMIT statement is useful for limiting number of results, as a quick and dirty way to get sample of relation.
- LIMIT will select any n tuples from relation, but when used immediately after an ORDER statement, the order is retained.
grunt> D = LIMIT B 2;
grunt> DUMP D;
(1,2)
(2,4)

- If the limit is greater than number of tuples in relation, all tuples are returned.
- You should always use LIMIT if you are not interested in entire output.

Combining and Splitting Data:

- Sometimes you have several relations you would like to combine into one.
For this UNION statement is used.
grunt> DUMP A;
(2,3)
(1,2)
(2,4)
grunt> DUMP B;
(z,x,8)
(w,y,1)
grunt> C = UNION A, B;
grunt> DUMP C;
(2,3)
(1,2)
(2,4)
(z,x,8)
(w,y,1)

SPLIT operator is opposite of UNION, it partitions relation into two or more relations,

Pig in Practice:

- There are some practical techniques that are worth knowing about when you are developing and running Pig programs.

Parallelism:

- By default, Pig will set the number of reducers by looking at the size of the input, and using one reducer per 1GB of input, up to maximum of 999 reducers.
- You can override these parameters by setting pig.exec.reducers.bytes.per.reducer(Default: 1000000000 bytes) and pig.exec.reducers.max(default 999).
- Following line sets the number of reducers to 30 for the GROUP:
grouped_records = GROUP records BY year PARALLEL 30;
Alternatively, you can set the default_parallel option, and it will take effect for all
subsequent jobs:
grunt> set default_parallel 30

Parameter Substitution

- Pig supports parameter substitution, where parameters in script are substituted with values supplied at runtime.
-Parameters are denoted by identifiers prefixed with $ character, for ex, $input and $output are used in following script to specify input and output paths:

-- max_temp_param.pig
records = LOAD '$input' AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
  MAX(filtered_records.temperature);
STORE max_temp into '$output';

- Parameters can be specified when launching Pig, using the -param option, one for each parameter:

% pig -param input=/user/tom/input/ncdc/micro-tab/sample.txt \
>     -param output=/tmp/out \
>     ch11/src/main/pig/max_temp_param.pig

For example, we can achieve the same result as the previous command by placing the
parameter definitions in a file:
# Input file
input=/user/tom/input/ncdc/micro-tab/sample.txt
# Output file
output=/tmp/out
The pig invocation then becomes:
% pig -param_file ch11/src/main/pig/max_temp_param.param \
>     ch11/src/main/pig/max_temp_param.pig

Dynamic Parameters:

For parameters that are supplied using the -param option, it is easy to make the value
dynamic by running a command or script. Many Unix shells support command substitution
for a command enclosed in backticks, and we can use this to make the output directory date-based:
% pig -param input=/user/tom/input/ncdc/micro-tab/sample.txt \
>     -param output=/tmp/`date "+%Y-%m-%d"`/out \
>     ch11/src/main/pig/max_temp_param.pig
Pig also supports backticks in parameter files, by executing the enclosed command in
a shell and using the shell output as the substituted value. If the command or scripts
exits with a nonzero exit status, then the error message is reported and execution halts.
Backtick support in parameter files is a useful feature; it means that parameters can be
defined in the same way if they are defined in a file or on the command line.

Parameter substitution processing:

- Parameter substitution occurs as preprocessing step before the script is run.
-  You can see the substitutions that the preprocessor made by executing Pig with the -dryrun option. In dry run mode, Pig performs parameter substitution (and macro expansion) and generates a copy of the original script with substituted values, but does not execute the script. You can inspect the generated script and check that the substitutions look sane (because they are dynamically generated, for example) before running it in normal mode.










No comments:

Post a Comment