LOAD:
Loading the data into “base”
grunt> base = load 'filepig.txt' using PigStorage('|') as
(empid:int,ename:chararray,salary:int);
PigStorage(‘|’) è
is used to separate the columns using delimiter ‘|’
The filepig.txt contains
3columns separated with delimiter “|”, that was done by PigStorage(‘|’) and name each column with title by using as
(empid:int,ename:chararray,salary:int); ,so empid represents 1st column, ename represents 2nd
column, salary represents 3rd column.
The above line load the data from filepig.txt into base operator.
We can check the operator description by using describe
base;
foreach and generate: goes through the each
column and generates results.
grunt> eid = foreach base generate empid;
DUMP: dump is
used to get the result from the operators/variables.
grunt> dump base;
Result:
FILTER: filter is
used to filter data, example filter the employees who has salary more than
4000.
grunt> moresal = filter base
by salary==4000;
dump moresal;
ILLUSTRATE: shows
schema how the data has been stored internally.
grunt> illustrate base;
EXPLAIN:
Used to explain logical plan, map and reducer plan and scope
of the operands/variables.
grunt> explain emp;
2016-06-17 02:35:55,746 [main] INFO org.apache.hadoop.conf.Configuration.deprecation
- fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-17 02:35:55,752 [main] WARN org.apache.pig.data.SchemaTupleBackend -
SchemaTupleBackend has already been initialized
2016-06-17 02:35:55,752 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer
- {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,
GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter,
MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer,
PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
emp: (Name: LOStore Schema:
eid#10443:int,name#10444:chararray,sal#10445:chararray)
|
|---emp: (Name: LOForEach Schema:
eid#10443:int,name#10444:chararray,sal#10445:chararray)
| |
| (Name: LOGenerate[false,false,false] Schema:
eid#10443:int,name#10444:chararray,sal#10445:chararray)ColumnPrune:OutputUids=[10443,
10444, 10445]ColumnPrune:InputUids=[10443, 10444, 10445]
| | |
| |
(Name: Cast Type: int Uid: 10443)
| | |
| |
|---eid:(Name: Project Type: bytearray Uid: 10443 Input: 0 Column: (*))
| | |
| |
(Name: Cast Type: chararray Uid: 10444)
| | |
| |
|---name:(Name: Project Type: bytearray Uid: 10444 Input: 1 Column: (*))
| | |
| |
(Name: Cast Type: chararray Uid: 10445)
| | |
| |
|---sal:(Name: Project Type: bytearray Uid: 10445 Input: 2 Column: (*))
| |
| |---(Name: LOInnerLoad[0] Schema:
eid#10443:bytearray)
| |
| |---(Name: LOInnerLoad[1] Schema:
name#10444:bytearray)
| |
| |---(Name: LOInnerLoad[2] Schema:
sal#10445:bytearray)
|
|---emp:
(Name: LOLoad Schema:
eid#10443:bytearray,name#10444:bytearray,sal#10445:bytearray)RequiredFields:null
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
emp:
Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-621
|
|---emp: New For Each(false,false,false)[bag] -
scope-620
| |
| Cast[int] - scope-612
| |
| |---Project[bytearray][0] - scope-611
| |
| Cast[chararray] - scope-615
| |
| |---Project[bytearray][1] - scope-614
| |
| Cast[chararray] - scope-618
| |
| |---Project[bytearray][2] - scope-617
|
|---emp:
Load(hdfs://localhost:9000/user/hadoop/emp.txt:PigStorage('|')) - scope-610
2016-06-17 02:35:55,757 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler
- File concatenation threshold: 100 optimistic? false
2016-06-17 02:35:55,759 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size before optimization: 1
2016-06-17 02:35:55,759 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-622
Map Plan
emp: Store(fakefile:org.apache.pig.builtin.PigStorage)
- scope-621
|
|---emp: New For Each(false,false,false)[bag] -
scope-620
| |
| Cast[int] - scope-612
| |
| |---Project[bytearray][0] - scope-611
| |
| Cast[chararray] - scope-615
| |
| |---Project[bytearray][1] - scope-614
| |
| Cast[chararray] - scope-618
| |
| |---Project[bytearray][2] - scope-617
|
|---emp:
Load(hdfs://localhost:9000/user/hadoop/emp.txt:PigStorage('|')) -
scope-610--------
Global sort: false
----------------
Order By:
grunt> ordersalary = order base by salary;
grunt> dump ordersalary;
Group BY:
grunt> group_data = group emp by eid;
grunt> dump group_data;
grunt> describe group_data;
JOIN:
Let’s take another table, name it as dept.txt, and rename the old file as emp.txt, so it doesn’t confuse.
Move the dept and emp files to HDFS.
$ hadoop fs -put dept.txt
$ hadoop fs -put emp.txt
grunt> emp = load 'emp.txt' using PigStorage('|');
grunt> dept = load dept.txt' using PigStorage('|');
grunt> joindata = join emp by $0, dept by $1;
grunt> dump joindata;
here $0 represents,
the first column in the emp Bag and $1 represents second column of Dept Bag.
Left Join ->
Left, right, outer joins will not be executed without well-defined
bag schema. Example, the below left join will not work, but
The following left join will work, because it has schema.
grunt> emp = load 'emp.txt' using PigStorage('|') as
(eid:int,name:chararray,sal:bytearray);
grunt> dept = load 'dept.txt' using PigStorage('|') as
(did:int,eid:int,name:chararray);
grunt> ljoin = join emp by eid left,dept by eid;
grunt> dump ljoin;
Right Join:
grunt> rjoin = join emp by eid right,dept by eid;
grunt> dump rjoin;
STORE:
grunt> store rjoin into '/user/hadoop/rjoin_putput'
[hadoop@localhost blog]$ hadoop fs -cat
/user/hadoop/rjoin_putput/part-r-00000
UNION:
Union is used to combine two similar bags.
Union_data = union emp,dept;
CROSS:
Is used to multiple the second operator/variable data with
first operator/variable.
grunt> cross_data = cross emp,dept;
it multiple dept data
by emp data.
grunt>dump cross_data;
LIMIT:
Is used to limit the output by number lines.
grunt> limit_data = limit cross_data 3;
grunt> dump limit_data;
it limit the output by
3 lines.