About

Edit photo

Friday, June 17, 2016

PIG Transformations or Operators - Part 2




LOAD:
Loading the data into “base”
grunt> base = load 'filepig.txt' using PigStorage('|') as (empid:int,ename:chararray,salary:int);

PigStorage(‘|’) è is used to separate the columns using delimiter ‘|’
The filepig.txt contains 3columns separated with delimiter “|”, that was done by PigStorage(‘|’) and name each column with title by using as (empid:int,ename:chararray,salary:int); ,so empid represents 1st column, ename represents 2nd column, salary represents 3rd column.

The above line load the data from filepig.txt into base operator.






We can check the operator description by using describe base;



foreach and generate: goes through the each column and generates results.
grunt> eid = foreach base generate empid;





DUMP: dump is used to get the result from the operators/variables.
grunt> dump base;
Result:



FILTER: filter is used to filter data, example filter the employees who has salary more than 4000.
grunt> moresal = filter base by salary==4000;

            
dump moresal;




ILLUSTRATE: shows schema how the data has been stored internally.
grunt> illustrate base;



EXPLAIN:
Used to explain logical plan, map and reducer plan and scope of the operands/variables.
grunt> explain emp;
2016-06-17 02:35:55,746 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-17 02:35:55,752 [main] WARN  org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2016-06-17 02:35:55,752 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
emp: (Name: LOStore Schema: eid#10443:int,name#10444:chararray,sal#10445:chararray)
|
|---emp: (Name: LOForEach Schema: eid#10443:int,name#10444:chararray,sal#10445:chararray)
    |   |
    |   (Name: LOGenerate[false,false,false] Schema: eid#10443:int,name#10444:chararray,sal#10445:chararray)ColumnPrune:OutputUids=[10443, 10444, 10445]ColumnPrune:InputUids=[10443, 10444, 10445]
    |   |   |
    |   |   (Name: Cast Type: int Uid: 10443)
    |   |   |
    |   |   |---eid:(Name: Project Type: bytearray Uid: 10443 Input: 0 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 10444)
    |   |   |
    |   |   |---name:(Name: Project Type: bytearray Uid: 10444 Input: 1 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 10445)
    |   |   |
    |   |   |---sal:(Name: Project Type: bytearray Uid: 10445 Input: 2 Column: (*))
    |   |
    |   |---(Name: LOInnerLoad[0] Schema: eid#10443:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[1] Schema: name#10444:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[2] Schema: sal#10445:bytearray)
    |
    |---emp: (Name: LOLoad Schema: eid#10443:bytearray,name#10444:bytearray,sal#10445:bytearray)RequiredFields:null
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
emp: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-621
|
|---emp: New For Each(false,false,false)[bag] - scope-620
    |   |
    |   Cast[int] - scope-612
    |   |
    |   |---Project[bytearray][0] - scope-611
    |   |
    |   Cast[chararray] - scope-615
    |   |
    |   |---Project[bytearray][1] - scope-614
    |   |
    |   Cast[chararray] - scope-618
    |   |
    |   |---Project[bytearray][2] - scope-617
    |
    |---emp: Load(hdfs://localhost:9000/user/hadoop/emp.txt:PigStorage('|')) - scope-610

2016-06-17 02:35:55,757 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2016-06-17 02:35:55,759 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2016-06-17 02:35:55,759 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-622
Map Plan
emp: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-621
|
|---emp: New For Each(false,false,false)[bag] - scope-620
    |   |
    |   Cast[int] - scope-612
    |   |
    |   |---Project[bytearray][0] - scope-611
    |   |
    |   Cast[chararray] - scope-615
    |   |
    |   |---Project[bytearray][1] - scope-614
    |   |
    |   Cast[chararray] - scope-618
    |   |
    |   |---Project[bytearray][2] - scope-617
    |
    |---emp: Load(hdfs://localhost:9000/user/hadoop/emp.txt:PigStorage('|')) - scope-610--------
Global sort: false
----------------

Order By:
grunt> ordersalary = order base by salary;
grunt> dump ordersalary;




Group BY:
grunt> group_data = group emp by eid;
grunt> dump group_data;


grunt> describe group_data;
 



JOIN:

Let’s take another table, name it as dept.txt, and rename the old file as emp.txt, so it doesn’t confuse.
Move the dept and emp files to HDFS.
$ hadoop fs -put dept.txt
$ hadoop fs -put emp.txt


grunt> emp = load 'emp.txt' using PigStorage('|');
grunt> dept = load dept.txt' using PigStorage('|');




grunt> joindata = join emp by $0, dept by $1;
grunt> dump joindata;
here $0 represents, the first column in the emp Bag and $1 represents second column of Dept Bag.




Left Join ->
Left, right, outer joins will not be executed without well-defined bag schema. Example, the below left join will not work, but

The following left join will work, because it has schema.
grunt> emp = load 'emp.txt' using PigStorage('|') as (eid:int,name:chararray,sal:bytearray);
grunt> dept = load 'dept.txt' using PigStorage('|') as (did:int,eid:int,name:chararray);
grunt> ljoin = join emp by eid left,dept by eid;
grunt> dump ljoin;



Right Join:
grunt> rjoin = join emp by eid right,dept by eid;
grunt> dump rjoin;




STORE:

grunt> store rjoin into '/user/hadoop/rjoin_putput'



[hadoop@localhost blog]$ hadoop fs -cat /user/hadoop/rjoin_putput/part-r-00000


UNION:
Union is used to combine two similar bags.
Union_data = union emp,dept;

CROSS:
Is used to multiple the second operator/variable data with first operator/variable.
grunt> cross_data = cross emp,dept;
it multiple dept data by emp data.
grunt>dump cross_data;


LIMIT:
Is used to limit the output by number lines.
grunt> limit_data = limit cross_data 3;
grunt> dump limit_data;
it limit the output by 3 lines.




Flatten, AggFunctions, Distinct, Cogroup, Tokenizer

click here for PART 1 - What is PIG
click here for PART 3 - Advanced scripts


0 comments:

Post a Comment