blob: baa8986fe5e465d51c3d646282f22a35b871ee68 [file] [log] [blame]
---
title: Bag Operations - Guide - Apache DataFu Pig
version: 1.4.0
section_name: Apache DataFu Pig - Guide
license: >
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
## Bag operations
Often when working with data in Pig, it makes sense to keep the data grouped by one or more fields,
which means you are working with bags. Unfortunately there aren't many convenient ways to work
with bags in Pig out of the box. For this reason Apache DataFu provides several UDFs for performing useful
operations on bags that come up in practice.
### Counting Items in Bags
The [CountEach](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/CountEach.html) UDF
can be used to count the number of instances of items within a bag. It produces a new bag of the
distinct items with their respective counts appended.
Let's take a look at an example where this might be useful.
Suppose that we have a recommendation system, and we've tracked what items have been recommended.
```pig
items = FOREACH items GENERATE memberId, itemId;
```
Let's say that we want to compute the number of times an item has been shown to each user.
Our output will have this schema:
```
{memberId:int, items: {{itemId:long, cnt:long}}}
```
Typically we would have to perform to `GROUP` operations to get this output. First we group
by `(memberId,itemId)`, count, and then group a second time. This requires two MapReduce jobs.
To make this case more efficient, we can use the `CountEach` UDF.
It will produce the same output, but it only requires a single `GROUP` operation:
```pig
DEFINE CountEach datafu.pig.bags.CountEach();
items = FOREACH (GROUP items BY memberId) GENERATE
group as memberId,
CountEach(items.(itemId)) as items;
```
### Bag Concatenation
[BagConcat](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagConcat.html) can be used
to concatenate the tuples from two or more bags into a single bag:
```pig
define BagConcat datafu.pig.bags.BagConcat();
-- ({(1),(2),(3)},{(4),(5)},{(6),(7)})
input = LOAD 'input' AS (B1: bag{T: tuple(v:INT)}, B2: bag{T: tuple(v:INT)}, B3: bag{T: tuple(v:INT)});
-- ({(1),(2),(3),(4),(5),(6),(7)})
output = FOREACH input GENERATE BagConcat(B1,B2,B3);
```
[BagConcat](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagConcat.html) can also be
used to concatenate all tuples present in a bag of bags.
```pig
define BagConcat datafu.pig.bags.BagConcat();
-- ({({(1),(2),(3)}),({(3),(4),(5)})})
input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});
-- ({(1),(2),(3),(3),(4),(5)})
output = FOREACH input GENERATE BagConcat(A);
```
### Grouping Within a Bag
Pig has a `GROUP` operation that can be applied to a relation. It produces a new relation where the input
tuples are grouped by a particular key. A bag in the relation contains the grouped tuples for that key. The key
is represented by a `group` parameter.
[BagGroup](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagGroup.html) mimics the `GROUP`
operation from Pig. The difference between them is that `BagGroup` operates within a bag, rather than on a relation.
This can be useful when operating on bags that are not very large in size. We can operate on the tuples within
this bag without involving `GROUP`, which would result in another MapReduce job. With `BagGroup` we can avoid this.
In the following example we take an `input_bag` consisting of key-value pairs `(k,v)` and group the tuples by `k`.
This produces a new bag having tuples consisting of `group` and `input_bag`. The `group` corresponds to the grouping
key `k`. The `input_bag` is a bag containing the tuples from the original `input_bag` that have the same `k` as `group`.
```pig
define BagGroup datafu.pig.bags.BagGroup();
data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)});
-- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)})
-- Group input_bag by k
data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped;
-- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
-- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})})
```
We could also project out the key from the final `input_bag` using a nested `FOREACH` so that the bag only
consists of the value `v`:
```pig
data3 = FOREACH data2 {
-- project only the value
projected = FOREACH grouped GENERATE group, input_bag.(v);
GENERATE projected as grouped;
}
-- data3: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
-- ({(1,{(A),(B)}),(2,{(A),(B),(C)}),(3,{(A)})})
```
### Append to Bag
[AppendToBag](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/AppendToBag.html) can be
used to append a tuple to a bag:
```pig
define AppendToBag datafu.pig.bags.AppendToBag();
-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-- ({(1),(2),(3),(4)})
output = FOREACH input GENERATE AppendToBag(B,T);
```
### Prepend to Bag
[PrependToBag](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/PrependToBag.html) can be
used to prepend a tuple to a bag:
```pig
define PrependToBag datafu.pig.bags.PrependToBag();
-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-- ({(4),(1),(2),(3)})
output = FOREACH input GENERATE PrependToBag(B,T);
```
### Join Bags
Pig has a `JOIN` operator, but unfortunately it only operates on relations. Thus, if you wish to join
tuples from two bags, you must first flatten, then join, then re-group. To make this process simpler DataFu
provides a [BagLeftOuterJoin](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/BagLeftOuterJoin.html)
UDF.
Let's walk through an example where this is useful. Suppose that we are building a recommendation system.
This system recommends items to users, and these recommendations may be ignored, accepted, or rejected.
When analyzing this system, we have a stream of impression, accept, and reject events:
```pig
impressions = LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long);
accepts = LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long);
rejects = LOAD '$rejects' AS (user_id:int, item_id:int, timestamp:long);
```
What we want to produce from this data is a bag of item counts per member:
```
features: {user_id:int, items:{(item_id:int, impression_count:int, accept_count:int, reject_count:int)}}
```
Using DataFu's
[CountEach](https://datafu.apache.org/docs/datafu/<%= current_page.data.version %>/datafu/pig/bags/CountEach.html)
we can efficiently produce the counts per item for impressions, accepts, and rejects as separate
bags per member using a single MapReduce job:
```pig
define CountEach datafu.pig.bags.CountEach();
features_counted = FOREACH (COGROUP impressions BY user_id,
accepts BY user_id,
rejects BY user_id) GENERATE
group as user_id,
CountEach(impressions.item_id) as impressions,
CountEach(accepts.item_id) as accepts,
CountEach(rejects.item_id) as rejects;
```
This produces three bags, consisting of `(item_id,count)`. We can now join these bags
together using `BagLeftOuterJoin`:
```pig
define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();
features_joined = FOREACH features_counted GENERATE
user_id,
BagLeftOuterJoin(
impressions, 'item_id',
accepts, 'item_id',
rejects, 'item_id'
) as items;
```
We left join in the impression here since the user cannot accept or reject an item that was not seen.
The left join can of course produce null values for accepts and rejects that did not occur, so let's
clean those up by replacing null values with counts of zero:
```pig
define Coalesce datafu.pig.util.Coalesce();
features = FOREACH features_joined {
projected = FOREACH items GENERATE
impressions::item_id as item_id,
impressions::count as impression_count,
Coalesce(accepts::count, 0) as accept_count,
Coalesce(rejects::count, 0) as reject_count;
GENERATE user_id, projected as items;
}
```