[pig] GROUP BY, reduce phase, STREAMING, nested FOREACH - dsindex/blog GitHub Wiki

1. 데이터의 컬럼이 아래와 같이 있다고 하자.

a b c d e
------------
1 2 3 4 5
3 4 5 8 9
2 3 4 9 8
.....

2. 데이터를 (a,b)로 GROUP BY하고 해당 data chunk에 대해서만 streaming으로 처리

define proc1 `$progname1` ship('$progpath1');
A = LOAD '$input' USING PigStorage('\t') AS (a,b,c,d,e)
B = GROUP A BY (a,b);
C = FOREACH B GENERATE FLATTEN($1);
D = STREAM C THROUGH proc1;
...
  • (a,b)로 'GROUP BY'하면 기본적으로 reduce phase로 넘어간다. 이 말은 데이터가 (a,b)를 key값으로 사용해서 shuffle된다는 뜻이다.
  • 따라서, 'GROUP BY'이후 STREAMING 처리를 하면 _같은 key값을 가지는 데이터는 반드시 해당 STREAMING 프로그램의 입력_으로 들어가게 된다.
    • STREAMING 프로그램은 reducer의 역할을 수행
    • pig를 이용한 mapreduce 처리에서 매우 자주 사용되는 테크닉
  • STREAMING 프로그램을 작성할때는 입력으로 같은 종류의 key값이 아래와 같은 형태로 들어온다는 것을 가정할수 있다.
1 2 3 4 5
1 2 3 5 6
1 2 4 9 8 
1 2 3 1 3
...
1 3 1 1 1 
1 3 3 1 2
1 3 1 4 8
....
1 3 3 9 9 

3. 데이터를 (a,b)로 GROUP BY하고 해당 data chunk에 대해서만 streaming으로 처리할때, 'c' 컬럼으로 정렬한 결과를 받고 싶은 경우

define proc2 `$progname2` ship('$progpath2');

E = GROUP D BY (a,b);
-- DESCRIBE E;
-- E: {group: (a,b), D: {a:chararray,b:chararray,...,e:chararray}}
F = FOREACH E {
    sorted = ORDER D by c;
    GENERATE FLATTEN(sorted);
};
G = STREAM F THROUGH proc2;
  • STREAMING 프로그램을 작성할때는 입력으로 같은 종류의 key값이 아래와 같이 'c' 컬럼으로 정렬되어 들어온다는 것을 가정할수 있다.
1 2 3 4 5
1 2 3 5 6
1 2 3 1 3
1 2 4 9 8 
...
1 3 1 1 1 
1 3 1 4 8
1 3 3 1 2
....
1 3 3 9 9 
  • 이와 같은 처리는 (a,b)가 같은 데이터에 한정해서 'c' 컬럼에 대해 중복처리를 하고 싶은 경우 혹은 near-duplicate detection을 하고 싶은 경우 매우 유용한 방법론이다.